/* Copyright (C) 2014 InfiniDB, Inc.
   Copyright (C) 2016-2021 MariaDB Corporation

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: writeengine.cpp 4737 2013-08-14 20:45:46Z bwilkinson $

/** @writeengine.cpp
 *   A wrapper class for the write engine to write information to files
 */

// XXX: a definition to switch off computations for token columns.
#define	XXX_WRITEENGINE_TOKENS_RANGES_XXX

#include <cmath>
#include <cstdlib>
#include <unistd.h>
#include <boost/scoped_array.hpp>
#include <boost/scoped_ptr.hpp>
#include <unordered_map>
using namespace std;

#include "joblisttypes.h"

#define WRITEENGINEWRAPPER_DLLEXPORT
#include "writeengine.h"
#undef WRITEENGINEWRAPPER_DLLEXPORT

#include "we_convertor.h"
#include "we_log.h"
#include "we_simplesyslog.h"
#include "we_config.h"
#include "we_bulkrollbackmgr.h"
#include "brm.h"
#include "stopwatch.h"
#include "we_colop.h"
#include "we_type.h"

#include "we_colopcompress.h"
#include "we_dctnrycompress.h"
#include "cacheutils.h"
#include "calpontsystemcatalog.h"
#include "we_simplesyslog.h"
using namespace cacheutils;
using namespace logging;
using namespace BRM;
using namespace execplan;
#include "IDBDataFile.h"
#include "IDBPolicy.h"
#include "MonitorProcMem.h"
using namespace idbdatafile;
#include "dataconvert.h"
#include "string_prefixes.h"

#include "mcs_decimal.h"

namespace WriteEngine
//#define PROFILE 1

#define RETURN_ON_ERROR_REPORT(statement)                                                              \
  do                                                                                                   \
  {                                                                                                    \
    int rc = (statement);                                                                              \
    if (rc != NO_ERROR)                                                                                \
    {                                                                                                  \
      cerr << "failed at " << __LINE__ << "function " << __FUNCTION__ << ", statement '" << #statement \
           << "', rc " << rc << endl;                                                                  \
      return rc;                                                                                       \
    }                                                                                                  \
  } while (0)

{
StopWatch timer;
using OidToIdxMap = std::unordered_map<OID, ColStructList::size_type>;

/**@brief WriteEngineWrapper Constructor
 */
WriteEngineWrapper::WriteEngineWrapper() : m_opType(NOOP)
{
  m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
  m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;

  m_colOp[COMPRESSED_OP_1] = new ColumnOpCompress1(/*comressionType=*/1);
  m_dctnry[COMPRESSED_OP_1] = new DctnryCompress1(/*compressionType=*/1);

  m_colOp[COMPRESSED_OP_2] = new ColumnOpCompress1(/*comressionType=*/3);
  m_dctnry[COMPRESSED_OP_2] = new DctnryCompress1(/*compressionType=*/3);
}

WriteEngineWrapper::WriteEngineWrapper(const WriteEngineWrapper& rhs) : m_opType(rhs.m_opType)
{
  m_colOp[UN_COMPRESSED_OP] = new ColumnOpCompress0;
  m_dctnry[UN_COMPRESSED_OP] = new DctnryCompress0;

  m_colOp[COMPRESSED_OP_1] = new ColumnOpCompress1(/*compressionType=*/1);
  m_dctnry[COMPRESSED_OP_1] = new DctnryCompress1(/*compressionType=*/1);

  m_colOp[COMPRESSED_OP_2] = new ColumnOpCompress1(/*compressionType=*/3);
  m_dctnry[COMPRESSED_OP_2] = new DctnryCompress1(/*compressionType=*/3);
}

/**@brief WriteEngineWrapper Constructor
 */
WriteEngineWrapper::~WriteEngineWrapper()
{
  delete m_colOp[UN_COMPRESSED_OP];
  delete m_dctnry[UN_COMPRESSED_OP];

  delete m_colOp[COMPRESSED_OP_1];
  delete m_dctnry[COMPRESSED_OP_1];

  delete m_colOp[COMPRESSED_OP_2];
  delete m_dctnry[COMPRESSED_OP_2];
}

/**@brief Perform upfront initialization
 */
/* static */ void WriteEngineWrapper::init(unsigned subSystemID)
{
  SimpleSysLog::instance()->setLoggingID(logging::LoggingID(subSystemID));
  Config::initConfigCache();
  BRMWrapper::getInstance();

  // Bug 5415 Add HDFS MemBuffer vs. FileBuffer decision logic.
  config::Config* cf = config::Config::makeConfig();
  //--------------------------------------------------------------------------
  // Memory overload protection. This setting will cause the process to die should
  // it, by itself, consume maxPct of total memory. Monitored in MonitorProcMem.
  // Only used at the express direction of Field Support.
  //--------------------------------------------------------------------------
  int maxPct = 0;  // disable by default
  string strMaxPct = cf->getConfig("WriteEngine", "MaxPct");

  if (strMaxPct.length() != 0)
    maxPct = cf->uFromText(strMaxPct);

  //--------------------------------------------------------------------------
  // MemoryCheckPercent. This controls at what percent of total memory be consumed
  // by all processes before we switch from HdfsRdwrMemBuffer to HdfsRdwrFileBuffer.
  // This is only used in Hdfs installations.
  //--------------------------------------------------------------------------
  int checkPct = 95;
  string strCheckPct = cf->getConfig("SystemConfig", "MemoryCheckPercent");

  if (strCheckPct.length() != 0)
    checkPct = cf->uFromText(strCheckPct);

  //--------------------------------------------------------------------------
  // If we're either HDFS, or maxPct is turned on, start the monitor thread.
  // Otherwise, we don't need it, so don't waste the resources.
  //--------------------------------------------------------------------------
  if (maxPct > 0 || IDBPolicy::useHdfs())
  {
    new boost::thread(utils::MonitorProcMem(maxPct, checkPct, subSystemID));
  }
}

/*@brief checkValid --Check input parameters are valid
 */
/***********************************************************
 * DESCRIPTION:
 *    Check input parameters are valid
 * PARAMETERS:
 *    colStructList - column struct list
 *    colValueList - column value list
 *    ridList - rowid list
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in the checking process
 ***********************************************************/
int WriteEngineWrapper::checkValid(const TxnID& txnid, const ColStructList& colStructList,
                                   const ColValueList& colValueList, const RIDList& ridList) const
{
  ColTupleList curTupleList;
  ColStructList::size_type structListSize;
  ColValueList::size_type valListSize;
  ColTupleList::size_type totalRow;

  if (colStructList.size() == 0)
    return ERR_STRUCT_EMPTY;

  structListSize = colStructList.size();
  valListSize = colValueList.size();

  if (structListSize != valListSize)
    return ERR_STRUCT_VALUE_NOT_MATCH;

  for (ColValueList::size_type i = 0; i < valListSize; i++)
  {
    curTupleList = static_cast<ColTupleList>(colValueList[i]);
    totalRow = curTupleList.size();

    if (ridList.size() > 0)
    {
      if (totalRow != ridList.size())
        return ERR_ROWID_VALUE_NOT_MATCH;
    }

  }  // end of for (int i = 0;

  return NO_ERROR;
}

/*@brief findSmallestColumn --Find the smallest column for this table
 */
/***********************************************************
 * DESCRIPTION:
 *    Find the smallest column for this table
 * PARAMETERS:
 *    lowColLen - returns smallest column width
 *    colId - returns smallest column id
 *    colStructList - column struct list
 * RETURN:
 *  void
 ***********************************************************/
void WriteEngineWrapper::findSmallestColumn(uint32_t& colId, ColStructList colStructList)
// MCOL-1675: find the smallest column width to calculate the RowID from so
// that all HWMs will be incremented by this operation
{
  int32_t lowColLen = 8192;
  for (uint32_t colIt = 0; colIt < colStructList.size(); colIt++)
  {
    if (colStructList[colIt].colWidth < lowColLen)
    {
      colId = colIt;
      lowColLen = colStructList[colId].colWidth;
      if (lowColLen == 1)
      {
        break;
      }
    }
  }
}

/** @brief Fetch values from arrays into VT-types references casting arrays to (element type) ET-typed arrays.
 *
 * There might be two arrays: one from which we write to buffer and one with old values written before.
 * One of these arrays can be absent because of, well, physical absence. Insertion does not have
 * values written before and deletion does not have values to write to.
 */
template <typename VT, typename ET>
void fetchNewOldValues(VT& value, VT& oldValue, const void* array, const void* oldArray, size_t i,
                       size_t totalNewRow)
{
  const ET* eArray = static_cast<const ET*>(array);
  const ET* oldEArray = static_cast<const ET*>(oldArray);
  if (eArray)
  {
    value = eArray[i < totalNewRow ? i : 0];
  }
  if (oldEArray)
  {
    oldValue = oldEArray[i];
  }
}

static bool updateBigRangeCheckForInvalidity(ExtCPInfo* maxMin, int128_t value, int128_t oldValue,
                                             const void* valArrayVoid, const void* oldValArrayVoid)
{
  if (!oldValArrayVoid)
  {  // insertion. we can update range directly. range will not become invalid here.
    maxMin->fCPInfo.bigMax =
        std::max(maxMin->fCPInfo.bigMax,
                 value);  // we update big range because int columns can be associated with decimals.
    maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, value);
  }
  else if (!valArrayVoid)
  {  // deletion. we need to check old value only, is it on (or outside) range boundary.
    if (oldValue >= maxMin->fCPInfo.bigMax || oldValue <= maxMin->fCPInfo.bigMin)
    {
      maxMin->toInvalid();
      return true;  // no point working further.
    }
  }
  else if (valArrayVoid && oldValArrayVoid)
  {  // update. we need to check boundaries as in deletion and extend as in insertion.
     // check boundaries as in deletion accounting for possible extension.
    if ((oldValue <= maxMin->fCPInfo.bigMin && value > oldValue) ||
        (oldValue >= maxMin->fCPInfo.bigMax && value < oldValue))
    {  // we are overwriting value on the boundary with value that does not preserve or extend range.
      maxMin->toInvalid();
      return true;
    }
    maxMin->fCPInfo.bigMax = std::max(maxMin->fCPInfo.bigMax, value);
    maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, value);
  }
  return false;
}

template <typename InternalType>
bool updateRangeCheckForInvalidity(ExtCPInfo* maxMin, InternalType value, InternalType oldValue,
                                   const void* valArrayVoid, const void* oldValArrayVoid)
{
  if (!oldValArrayVoid)
  {  // insertion. we can update range directly. range will not become invalid here.
    maxMin->fCPInfo.bigMax = std::max(
        maxMin->fCPInfo.bigMax,
        (int128_t)value);  // we update big range because int columns can be associated with decimals.
    maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, (int128_t)value);
    maxMin->fCPInfo.max = std::max((InternalType)maxMin->fCPInfo.max, value);
    maxMin->fCPInfo.min = std::min((InternalType)maxMin->fCPInfo.min, value);
  }
  else if (!valArrayVoid)
  {  // deletion. we need to check old value only, is it on (or outside) range boundary.
    if (oldValue >= (InternalType)maxMin->fCPInfo.max || oldValue <= (InternalType)maxMin->fCPInfo.min)
    {
      maxMin->toInvalid();
      return true;  // no point working further.
    }
  }
  else if (valArrayVoid && oldValArrayVoid)
  {  // update. we need to check boundaries as in deletion and extend as in insertion.
     // check boundaries as in deletion accounting for possible extension.
    if ((oldValue <= (InternalType)maxMin->fCPInfo.min && value > oldValue) ||
        (oldValue >= (InternalType)maxMin->fCPInfo.max && value < oldValue))
    {  // we are overwriting value on the boundary with value that does not preserve or extend range.
      maxMin->toInvalid();
      return true;
    }
    maxMin->fCPInfo.bigMax = std::max(maxMin->fCPInfo.bigMax, (int128_t)value);
    maxMin->fCPInfo.bigMin = std::min(maxMin->fCPInfo.bigMin, (int128_t)value);
    maxMin->fCPInfo.max = std::max((InternalType)maxMin->fCPInfo.max, value);
    maxMin->fCPInfo.min = std::min((InternalType)maxMin->fCPInfo.min, value);
  }
  return false;
}

/**
 * There can be case with missing valArray (delete), missing oldValArray (insert) and when
 * both arrays are present (update).
 */
void WriteEngineWrapper::updateMaxMinRange(const size_t totalNewRow, const size_t totalOldRow,
                                           const execplan::CalpontSystemCatalog::ColType& cscColType,
                                           const ColType colType, const void* valArrayVoid,
                                           const void* oldValArrayVoid, ExtCPInfo* maxMin,
                                           bool canStartWithInvalidRange)
{
  if (!maxMin)
  {
    return;
  }
  if (colType == WR_CHAR)
  {
    maxMin->toInvalid();  // simple and wrong solution.
    return;
  }
  bool isUnsigned = false;  // TODO: should change with type.
  switch (colType)
  {
    case WR_UBYTE:
    case WR_USHORT:
    case WR_UINT:
    case WR_ULONGLONG:
    case WR_CHAR:
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
    case WR_TOKEN:
#endif
    {
      isUnsigned = true;
      break;
    }
    default:  // WR_BINARY is signed.
      break;
  }
  if (!canStartWithInvalidRange)
  {
    // check if range is invalid, we can't update it.
    if (maxMin->isInvalid())
    {
      return;
    }
  }
  if (colType == WR_CHAR)
  {
    idbassert(MAX_COLUMN_BOUNDARY == sizeof(uint64_t));  // have to check that - code below depends on that.
    if (!maxMin->isInvalid())
    {
      maxMin->fromToChars();
    }
  }
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
  if (colType == WR_TOKEN)
  {
    oldValArrayVoid = nullptr;  // no old values for tokens, sadly.
    valArrayVoid = (void*)maxMin->stringsPrefixes();
  }
#endif
  size_t i;
  for (i = 0; i < totalOldRow; i++)
  {
    int64_t value = 0, oldValue = 0;
    uint64_t uvalue = 0, oldUValue = 0;
    int128_t bvalue = 0, oldBValue = 0;

    // fetching. May not assign value or oldValue variables when array is not present.
    switch (colType)
    {
      case WR_BYTE:
      {
        fetchNewOldValues<int64_t, int8_t>(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow);
        break;
      }
      case WR_UBYTE:
      {
        fetchNewOldValues<uint64_t, uint8_t>(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i,
                                             totalNewRow);
        break;
      }
      case WR_SHORT:
      {
        fetchNewOldValues<int64_t, int16_t>(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow);
        break;
      }
      case WR_USHORT:
      {
        fetchNewOldValues<uint64_t, uint16_t>(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i,
                                              totalNewRow);
        break;
      }
      case WR_MEDINT:
      case WR_INT:
      {
        fetchNewOldValues<int64_t, int>(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow);
        break;
      }
      case WR_UMEDINT:
      case WR_UINT:
      {
        fetchNewOldValues<uint64_t, unsigned int>(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i,
                                                  totalNewRow);
        break;
      }
      case WR_LONGLONG:
      {
        fetchNewOldValues<int64_t, int64_t>(value, oldValue, valArrayVoid, oldValArrayVoid, i, totalNewRow);
        break;
      }
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
      case WR_TOKEN:
#endif
      case WR_ULONGLONG:
      {
        fetchNewOldValues<uint64_t, uint64_t>(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i,
                                              totalNewRow);
        break;
      }
      case WR_BINARY:
      {
        fetchNewOldValues<int128_t, int128_t>(bvalue, oldBValue, valArrayVoid, oldValArrayVoid, i,
                                              totalNewRow);
        break;
      }
      case WR_CHAR:
      {
        fetchNewOldValues<uint64_t, uint64_t>(uvalue, oldUValue, valArrayVoid, oldValArrayVoid, i, totalNewRow);
        // for characters (strings, actually), we fetched then in LSB order, on x86, at the very least.
        // this means most significant byte of the string, which is first, is now in LSB of uvalue/oldValue.
        // we must perform a conversion.
        uvalue = uint64ToStr(uvalue);
        oldUValue = uint64ToStr(oldUValue);
        break;
      }
      default: idbassert_s(0, "unknown WR type tag"); return;
    }
    if (maxMin->isBinaryColumn())
    {  // special case of wide decimals. They fit into int128_t range so we do not care about signedness.
      if (updateBigRangeCheckForInvalidity(maxMin, bvalue, oldBValue, valArrayVoid, oldValArrayVoid))
      {
        return;
      }
    }
    else if (isUnsigned)
    {
      if (updateRangeCheckForInvalidity(maxMin, uvalue, oldUValue, valArrayVoid, oldValArrayVoid))
      {
        return;
      }
    }
    else
    {
      if (updateRangeCheckForInvalidity(maxMin, value, oldValue, valArrayVoid, oldValArrayVoid))
      {
        return;
      }
    }
  }
  // the range will be kept.
  // to handle character columns properly we need to convert range values back to strings.
  if (colType == WR_CHAR)
  {
    maxMin->fromToChars();
  }
}
/*@convertValArray - Convert interface values to internal values
 */
/***********************************************************
 * DESCRIPTION:
 *    Convert interface values to internal values
 * PARAMETERS:
 *    cscColType -  CSC ColType struct list
 *    colStructList - column struct list
 *    colValueList - column value list
 * RETURN:
 *    none
 *    valArray - output value array
 *    nullArray - output null flag array
 ***********************************************************/
void WriteEngineWrapper::convertValArray(const size_t totalRow,
                                         const CalpontSystemCatalog::ColType& cscColType,
                                         const ColType colType, ColTupleList& curTupleList, void* valArray,
                                         bool bFromList)
{
  ColTuple curTuple;
  ColTupleList::size_type i;

  if (bFromList)
  {
    for (i = 0; i < curTupleList.size(); i++)
    {
      curTuple = curTupleList[i];
      convertValue(cscColType, colType, valArray, i, curTuple.data, true);
    }
  }
  else
  {
    for (i = 0; i < totalRow; i++)
    {
      convertValue(cscColType, colType, valArray, i, curTuple.data, false);
      curTupleList.push_back(curTuple);
    }
  }
}

/*
 * @brief Convert column value to its internal representation
 */
void WriteEngineWrapper::convertValue(const execplan::CalpontSystemCatalog::ColType& cscColType,
                                      ColType colType, void* value, boost::any& data)
{
  string curStr;
  int size;

  switch (colType)
  {
    case WriteEngine::WR_INT:
    case WriteEngine::WR_MEDINT:
      if (data.type() == typeid(int))
      {
        int val = boost::any_cast<int>(data);
        size = sizeof(int);
        memcpy(value, &val, size);
      }
      else
      {
        uint32_t val = boost::any_cast<uint32_t>(data);
        size = sizeof(uint32_t);
        memcpy(value, &val, size);
      }

      break;

    case WriteEngine::WR_UINT:
    case WriteEngine::WR_UMEDINT:
    {
      uint32_t val = boost::any_cast<uint32_t>(data);
      size = sizeof(uint32_t);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_VARBINARY:  // treat same as char for now
    case WriteEngine::WR_CHAR:
    case WriteEngine::WR_BLOB:
    case WriteEngine::WR_TEXT:
      curStr = boost::any_cast<string>(data);

      if ((int)curStr.length() > MAX_COLUMN_BOUNDARY)
        curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);

      memcpy(value, curStr.c_str(), curStr.length());

      break;

    case WriteEngine::WR_FLOAT:
    {
      float val = boost::any_cast<float>(data);

      // N.B.There is a bug in boost::any or in gcc where, if you store a nan, you will get back a nan,
      // but not necessarily the same bits that you put in. This only seems to be for float (double seems
      // to work).
      if (isnan(val))
      {
        uint32_t ti = joblist::FLOATNULL;
        float* tfp = (float*)&ti;
        val = *tfp;
      }

      size = sizeof(float);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_DOUBLE:
    {
      double val = boost::any_cast<double>(data);
      size = sizeof(double);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_SHORT:
    {
      short val = boost::any_cast<short>(data);
      size = sizeof(short);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_USHORT:
    {
      uint16_t val = boost::any_cast<uint16_t>(data);
      size = sizeof(uint16_t);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_BYTE:
    {
      char val = boost::any_cast<char>(data);
      size = sizeof(char);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_UBYTE:
    {
      uint8_t val = boost::any_cast<uint8_t>(data);
      size = sizeof(uint8_t);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_LONGLONG:
      if (data.type() == typeid(long long))
      {
        long long val = boost::any_cast<long long>(data);
        size = sizeof(long long);
        memcpy(value, &val, size);
      }
      else if (data.type() == typeid(long))
      {
        long val = boost::any_cast<long>(data);
        size = sizeof(long);
        memcpy(value, &val, size);
      }
      else
      {
        uint64_t val = boost::any_cast<uint64_t>(data);
        size = sizeof(uint64_t);
        memcpy(value, &val, size);
      }

      break;

    case WriteEngine::WR_ULONGLONG:
    {
      uint64_t val = boost::any_cast<uint64_t>(data);
      size = sizeof(uint64_t);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_TOKEN:
    {
      Token val = boost::any_cast<Token>(data);
      size = sizeof(Token);
      memcpy(value, &val, size);
    }
    break;

    case WriteEngine::WR_BINARY:
    {
      size = cscColType.colWidth;
      int128_t val = boost::any_cast<int128_t>(data);
      memcpy(value, &val, size);
    }
    break;

  }  // end of switch (colType)
} /*@convertValue -  The base for converting values */

/***********************************************************
 * DESCRIPTION:
 *    The base for converting values
 * PARAMETERS:
 *    colType - data type
 *    pos - array position
 *    data - value
 * RETURN:
 *    none
 ***********************************************************/
void WriteEngineWrapper::convertValue(const CalpontSystemCatalog::ColType& cscColType, const ColType colType,
                                      void* valArray, const size_t pos, boost::any& data, bool fromList)
{
  string curStr;

  if (fromList)
  {
    switch (colType)
    {
      case WriteEngine::WR_INT:
      case WriteEngine::WR_MEDINT:
        // here we assign value to an array element and update range.
        if (data.type() == typeid(long))
          ((int*)valArray)[pos] = static_cast<int>(boost::any_cast<long>(data));
        else if (data.type() == typeid(int))
          ((int*)valArray)[pos] = boost::any_cast<int>(data);
        else
        {  // this interesting part is for magic values like NULL or EMPTY (marks deleted elements).
          // we will not put these into range.
          ((int*)valArray)[pos] = boost::any_cast<uint32_t>(data);
        }

        break;

      case WriteEngine::WR_UINT:
      case WriteEngine::WR_UMEDINT: ((uint32_t*)valArray)[pos] = boost::any_cast<uint32_t>(data); break;

      case WriteEngine::WR_VARBINARY:  // treat same as char for now
      case WriteEngine::WR_CHAR:
      case WriteEngine::WR_BLOB:
      case WriteEngine::WR_TEXT:
        curStr = boost::any_cast<string>(data);

        if ((int)curStr.length() > MAX_COLUMN_BOUNDARY)
          curStr = curStr.substr(0, MAX_COLUMN_BOUNDARY);

        memcpy((char*)valArray + pos * MAX_COLUMN_BOUNDARY, curStr.c_str(), curStr.length());
        break;

        //            case WriteEngine::WR_LONG :   ((long*)valArray)[pos] =
        //            boost::any_cast<long>(curTuple.data);
        //                                          break;
      case WriteEngine::WR_FLOAT:
        ((float*)valArray)[pos] = boost::any_cast<float>(data);

        if (isnan(((float*)valArray)[pos]))
        {
          uint32_t ti = joblist::FLOATNULL;
          float* tfp = (float*)&ti;
          ((float*)valArray)[pos] = *tfp;
        }

        break;

      case WriteEngine::WR_DOUBLE: ((double*)valArray)[pos] = boost::any_cast<double>(data); break;

      case WriteEngine::WR_SHORT: ((short*)valArray)[pos] = boost::any_cast<short>(data); break;

      case WriteEngine::WR_USHORT:
        ((uint16_t*)valArray)[pos] = boost::any_cast<uint16_t>(data);
        break;

        //            case WriteEngine::WR_BIT:     ((bool*)valArray)[pos] = boost::any_cast<bool>(data);
        //                                          break;
      case WriteEngine::WR_BYTE: ((char*)valArray)[pos] = boost::any_cast<char>(data); break;

      case WriteEngine::WR_UBYTE: ((uint8_t*)valArray)[pos] = boost::any_cast<uint8_t>(data); break;

      case WriteEngine::WR_LONGLONG:
        if (data.type() == typeid(long long))
          ((long long*)valArray)[pos] = boost::any_cast<long long>(data);
        else if (data.type() == typeid(long))
          ((long long*)valArray)[pos] = (long long)boost::any_cast<long>(data);
        else
          ((long long*)valArray)[pos] = boost::any_cast<uint64_t>(data);

        break;

      case WriteEngine::WR_ULONGLONG: ((uint64_t*)valArray)[pos] = boost::any_cast<uint64_t>(data); break;

      case WriteEngine::WR_TOKEN:
        ((Token*)valArray)[pos] = boost::any_cast<Token>(data);
        break;

      case WriteEngine::WR_BINARY:
        size_t size = cscColType.colWidth;
        int128_t val = boost::any_cast<int128_t>(data);
        memcpy((uint8_t*)valArray + pos * size, &val, size);
        break;
    }  // end of switch (colType)
  }
  else
  {
    switch (colType)
    {
      case WriteEngine::WR_INT:
      case WriteEngine::WR_MEDINT: data = ((int*)valArray)[pos]; break;

      case WriteEngine::WR_UINT:
      case WriteEngine::WR_UMEDINT: data = ((uint64_t*)valArray)[pos]; break;

      case WriteEngine::WR_VARBINARY:  // treat same as char for now
      case WriteEngine::WR_CHAR:
      case WriteEngine::WR_BLOB:
      case WriteEngine::WR_TEXT:
        char tmp[10];
        memcpy(tmp, (char*)valArray + pos * 8, 8);
        curStr = tmp;
        data = curStr;
        break;

        //            case WriteEngine::WR_LONG :   ((long*)valArray)[pos] =
        //            boost::any_cast<long>(curTuple.data);
        //                                          break;
      case WriteEngine::WR_FLOAT: data = ((float*)valArray)[pos]; break;

      case WriteEngine::WR_DOUBLE: data = ((double*)valArray)[pos]; break;

      case WriteEngine::WR_SHORT: data = ((short*)valArray)[pos]; break;

      case WriteEngine::WR_USHORT:
        data = ((uint16_t*)valArray)[pos];
        break;

        //            case WriteEngine::WR_BIT:     data = ((bool*)valArray)[pos];
        //                                          break;
      case WriteEngine::WR_BYTE: data = ((char*)valArray)[pos]; break;

      case WriteEngine::WR_UBYTE: data = ((uint8_t*)valArray)[pos]; break;

      case WriteEngine::WR_LONGLONG: data = ((long long*)valArray)[pos]; break;

      case WriteEngine::WR_ULONGLONG: data = ((uint64_t*)valArray)[pos]; break;

      case WriteEngine::WR_TOKEN: data = ((Token*)valArray)[pos]; break;

      case WriteEngine::WR_BINARY: data = ((int128_t*)valArray)[pos]; break;

    }  // end of switch (colType)
  }    // end of if
}

/*@createColumn -  Create column files, including data and bitmap files
 */
/***********************************************************
 * DESCRIPTION:
 *    Create column files, including data and bitmap files
 * PARAMETERS:
 *    dataOid - column data file id
 *    bitmapOid - column bitmap file id
 *    colWidth - column width
 *    dbRoot   - DBRoot where file is to be located
 *    partition - Starting partition number for segment file path
 *     compressionType - compression type
 * RETURN:
 *    NO_ERROR if success
 *    ERR_FILE_EXIST if file exists
 *    ERR_FILE_CREATE if something wrong in creating the file
 ***********************************************************/
int WriteEngineWrapper::createColumn(const TxnID& txnid, const OID& dataOid,
                                     const CalpontSystemCatalog::ColDataType dataType, int dataWidth,
                                     uint16_t dbRoot, uint32_t partition, int compressionType)
{
  int rc;
  Column curCol;

  int compress_op = op(compressionType);
  m_colOp[compress_op]->initColumn(curCol);
  m_colOp[compress_op]->findTypeHandler(dataWidth, dataType);

  rc = m_colOp[compress_op]->createColumn(curCol, 0, dataWidth, dataType, WriteEngine::WR_CHAR, (FID)dataOid,
                                          dbRoot, partition);

  // This is optional, however, it's recommended to do so to free heap
  // memory if assigned in the future
  m_colOp[compress_op]->clearColumn(curCol);
  std::map<FID, FID> oids;

  if (rc == NO_ERROR)
    rc = flushDataFiles(NO_ERROR, txnid, oids);

  if (rc != NO_ERROR)
  {
    return rc;
  }

  RETURN_ON_ERROR(BRMWrapper::getInstance()->setLocalHWM(dataOid, partition, 0, 0));
  // @bug 281 : fix for bug 281 - Add flush VM cache to clear all write buffer
  // flushVMCache();
  return rc;
}

// BUG931
/**
 * @brief Fill column with default values
 */
int WriteEngineWrapper::fillColumn(const TxnID& txnid, const OID& dataOid,
                                   const CalpontSystemCatalog::ColType& colType, ColTuple defaultVal,
                                   const OID& refColOID,
                                   const CalpontSystemCatalog::ColDataType refColDataType, int refColWidth,
                                   int refCompressionType, bool isNULL, int compressionType,
                                   const string& defaultValStr, const OID& dictOid, bool autoincrement)
{
  int rc = NO_ERROR;
  Column newCol;
  Column refCol;
  ColType newColType;
  ColType refColType;
  boost::scoped_array<char> defVal(new char[datatypes::MAXDECIMALWIDTH]);
  ColumnOp* colOpNewCol = m_colOp[op(compressionType)];
  ColumnOp* refColOp = m_colOp[op(refCompressionType)];
  Dctnry* dctnry = m_dctnry[op(compressionType)];
  colOpNewCol->initColumn(newCol);
  refColOp->initColumn(refCol);
  uint16_t dbRoot = 1;  // not to be used
  int newDataWidth = colType.colWidth;
  // Convert HWM of the reference column for the new column
  // Bug 1703,1705
  bool isToken = false;

  if (((colType.colDataType == CalpontSystemCatalog::VARCHAR) && (colType.colWidth > 7)) ||
      ((colType.colDataType == CalpontSystemCatalog::CHAR) && (colType.colWidth > 8)) ||
      (colType.colDataType == CalpontSystemCatalog::VARBINARY) ||
      (colType.colDataType == CalpontSystemCatalog::BLOB) ||
      (colType.colDataType == CalpontSystemCatalog::TEXT))
  {
    isToken = true;
  }

  Convertor::convertColType(colType.colDataType, colType.colWidth, newColType, isToken);

  // WIP
  // replace with isDictCol
  if (((refColDataType == CalpontSystemCatalog::VARCHAR) && (refColWidth > 7)) ||
      ((refColDataType == CalpontSystemCatalog::CHAR) && (refColWidth > 8)) ||
      (refColDataType == CalpontSystemCatalog::VARBINARY) ||
      (colType.colDataType == CalpontSystemCatalog::BLOB) ||
      (colType.colDataType == CalpontSystemCatalog::TEXT))
  {
    isToken = true;
  }

  newDataWidth = colOpNewCol->getCorrectRowWidth(colType.colDataType, colType.colWidth);
  // MCOL-1347 CS doubles the width for ALTER TABLE..ADD COLUMN
  if (colType.colWidth < 4 && colType.colDataType == CalpontSystemCatalog::VARCHAR)
  {
    newDataWidth >>= 1;
  }

  Convertor::convertColType(refColDataType, refColWidth, refColType, isToken);
  refColOp->setColParam(refCol, 0, refColOp->getCorrectRowWidth(refColDataType, refColWidth), refColDataType,
                        refColType, (FID)refColOID, refCompressionType, dbRoot);
  colOpNewCol->setColParam(newCol, 0, newDataWidth, colType.colDataType, newColType, (FID)dataOid,
                           compressionType, dbRoot);
  // refColOp and colOpNewCol are the same ptr.
  colOpNewCol->findTypeHandler(newDataWidth, colType.colDataType);

  int size = sizeof(Token);

  if (newColType == WriteEngine::WR_TOKEN)
  {
    if (isNULL)
    {
      Token nullToken;
      memcpy(defVal.get(), &nullToken, size);
    }
    // Tokenization is done when we create dictionary file
  }
  else
  {
    // WIP
    convertValue(colType, newColType, defVal.get(), defaultVal.data);
  }

  if (rc == NO_ERROR)
    rc = colOpNewCol->fillColumn(txnid, newCol, refCol, defVal.get(), dctnry, refColOp, dictOid,
                                 colType.colWidth, defaultValStr, autoincrement);

  // flushing files is in colOp->fillColumn()

  return rc;
}

// TODO: Get rid of this
void emptyValueToAny(boost::any* any, const uint8_t* emptyValue, int colWidth)
{
  switch (colWidth)
  {
    case 16:
      *any = *(uint128_t*)emptyValue;
      break;
    case 8:
      *any = *(uint64_t*)emptyValue;
      break;
    case 4:
      *any = *(uint32_t*)emptyValue;
      break;
    case 2:
      *any = *(uint16_t*)emptyValue;
      break;
    default:
      *any = *emptyValue;
  }
}


int WriteEngineWrapper::deleteRow(const TxnID& txnid, const vector<CSCTypesList>& colExtentsColType,
                                  vector<ColStructList>& colExtentsStruct, vector<void*>& colOldValueList,
                                  vector<RIDList>& ridLists, const int32_t tableOid, bool hasAUXCol)
{
  ColTuple curTuple;
  ColStruct curColStruct;
  CalpontSystemCatalog::ColType cscColType;
  DctnryStruct dctnryStruct;
  ColValueList colValueList;
  ColTupleList curTupleList;
  DctnryStructList dctnryStructList;
  DctnryValueList dctnryValueList;
  ColStructList colStructList;
  CSCTypesList cscColTypeList;
  int rc;
  string tmpStr("");
  vector<DctnryStructList> dctnryExtentsStruct;

  if (colExtentsStruct.size() == 0 || ridLists.size() == 0)
    return ERR_STRUCT_EMPTY;

  // set transaction id
  setTransId(txnid);
  unsigned numExtents = colExtentsStruct.size();

  for (unsigned extent = 0; extent < numExtents; extent++)
  {
    colStructList = colExtentsStruct[extent];
    cscColTypeList = colExtentsColType[extent];

    for (ColStructList::size_type i = 0; i < colStructList.size(); i++)
    {
      curTupleList.clear();
      curColStruct = colStructList[i];
      cscColType = cscColTypeList[i];
      Convertor::convertColType(&curColStruct);

      const uint8_t* emptyVal = m_colOp[op(curColStruct.fCompressionType)]->getEmptyRowValue(
          curColStruct.colDataType, curColStruct.colWidth);

      emptyValueToAny(&curTuple.data, emptyVal, curColStruct.colWidth);

      curTupleList.push_back(curTuple);
      colValueList.push_back(curTupleList);

      dctnryStruct.dctnryOid = 0;
      dctnryStruct.fColPartition = curColStruct.fColPartition;
      dctnryStruct.fColSegment = curColStruct.fColSegment;
      dctnryStruct.fColDbRoot = curColStruct.fColDbRoot;
      dctnryStruct.columnOid = colStructList[i].dataOid;
      dctnryStructList.push_back(dctnryStruct);

      DctnryTuple dctnryTuple;
      DctColTupleList dctColTuples;
      dctnryTuple.sigValue = (unsigned char*)tmpStr.c_str();
      dctnryTuple.sigSize = tmpStr.length();
      dctnryTuple.isNull = true;
      dctColTuples.push_back(dctnryTuple);
      dctnryValueList.push_back(dctColTuples);
    }

    dctnryExtentsStruct.push_back(dctnryStructList);
  }

  // unfortunately I don't have a better way to instruct without passing too many parameters
  m_opType = DELETE;
  rc = updateColumnRec(txnid, colExtentsColType, colExtentsStruct, colValueList, colOldValueList, ridLists,
                       dctnryExtentsStruct, dctnryValueList, tableOid, hasAUXCol);
  m_opType = NOOP;

  return rc;
}

inline void allocateValArray(void*& valArray, ColTupleList::size_type totalRow, ColType colType, int colWidth)
{
  // MCS allocates 8 bytes even for CHARs smaller then 8 bytes.
  switch (colType)
  {
    case WriteEngine::WR_VARBINARY:  // treat same as char for now
    case WriteEngine::WR_CHAR:
    case WriteEngine::WR_BLOB:
    case WriteEngine::WR_TEXT: valArray = calloc(totalRow * MAX_COLUMN_BOUNDARY, sizeof(char)); break;
    case WriteEngine::WR_TOKEN: valArray = calloc(totalRow, sizeof(Token)); break;
    default: valArray = calloc(totalRow, colWidth); break;
  }
}

int WriteEngineWrapper::deleteBadRows(const TxnID& txnid, ColStructList& colStructs, RIDList& ridList,
                                      DctnryStructList& dctnryStructList)
{
  /*  Need to scan all files including dictionary store files to check whether there is any bad chunks
   *
   */
  int rc = 0;
  Column curCol;
  void* valArray = NULL;

  m_opType = DELETE;

  for (unsigned i = 0; i < colStructs.size(); i++)
  {
    ColumnOp* colOp = m_colOp[op(colStructs[i].fCompressionType)];
    unsigned needFixFiles = colStructs[i].tokenFlag ? 2 : 1;
    colOp->initColumn(curCol);

    for (unsigned j = 0; j < needFixFiles; j++)
    {
      if (j == 0)
      {
        colOp->setColParam(curCol, 0, colStructs[i].colWidth, colStructs[i].colDataType,
                           colStructs[i].colType, colStructs[i].dataOid, colStructs[i].fCompressionType,
                           colStructs[i].fColDbRoot, colStructs[i].fColPartition, colStructs[i].fColSegment);
        colOp->findTypeHandler(colStructs[i].colWidth, colStructs[i].colDataType);

        string segFile;
        rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)  // If openFile fails, disk error or header error is assumed.
        {
          // report error and return.
          std::ostringstream oss;
          WErrorCodes ec;
          string err = ec.errorString(rc);
          oss << "Error opening file oid:dbroot:partition:segment = " << colStructs[i].dataOid << ":"
              << colStructs[i].fColDbRoot << ":" << colStructs[i].fColPartition << ":"
              << colStructs[i].fColSegment << " and error code is " << rc << " with message " << err;
          throw std::runtime_error(oss.str());
        }

        allocateValArray(valArray, 1, colStructs[i].colType, colStructs[i].colWidth);

        rc = colOp->writeRows(curCol, ridList.size(), ridList, valArray, 0, true);

        if (rc != NO_ERROR)
        {
          // read error is fixed in place
          if (rc == ERR_COMP_COMPRESS)  // write error
          {
          }
        }

        // flush files will be done in the end of fix.
        colOp->clearColumn(curCol);

        if (valArray != NULL)
          free(valArray);
      }
      else  // dictionary file. How to fix
      {
        // read headers out, uncompress the last chunk, if error, replace it with empty chunk.
        Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
        rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot,
                                dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment, false);

        rc = dctnry->checkFixLastDictChunk();
        rc = dctnry->closeDctnry(true);
      }
    }
  }

  return rc;
}

/*@flushVMCache - Flush VM cache
 */
/***********************************************************
 * DESCRIPTION:
 *    Flush sytem VM cache
 * PARAMETERS:
 *    none
 * RETURN:
 *    none
 ***********************************************************/
void WriteEngineWrapper::flushVMCache() const
{
  //      int fd = open("/proc/sys/vm/drop_caches", O_WRONLY);
  //      write(fd, "3", 1);
  //      close(fd);
}

#if 0
// XXX temporary for debugging purposes
static void log_this(const char *message,
    logging::LOG_TYPE log_type, unsigned sid)
{
    // corresponds with dbcon in SubsystemID vector
    // in messagelog.cpp
    unsigned int subSystemId = 24;
    logging::LoggingID logid( subSystemId, sid, 0);
    logging::Message::Args args1;
    logging::Message msg(1);
    args1.add(message);
    msg.format( args1 );
    Logger logger(logid.fSubsysID);
    logger.logMessage(log_type, msg, logid);
}
#endif

/** @brief Determine whether we may update a column's ranges (by type) and return nullptr if we can't */
static ExtCPInfo* getCPInfoToUpdateForUpdatableType(const ColStruct& colStruct, ExtCPInfo* currentCPInfo,
                                                    OpType optype)
{
  if (colStruct.tokenFlag)
  {
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
    if (currentCPInfo && currentCPInfo->hasStringsPrefixes() && optype == INSERT)
    {
      return currentCPInfo;
    }
#endif
    return nullptr;
  }
  switch (colStruct.colType)
  {
    // here we enumerate all supported types.
    case WR_BYTE:
    case WR_SHORT:
    case WR_INT:
    case WR_LONGLONG:
    case WR_CHAR:
    case WR_UBYTE:
    case WR_USHORT:
    case WR_UINT:
    case WR_ULONGLONG:
    case WR_MEDINT:
    case WR_UMEDINT:
    case WR_BINARY:
    {
      return currentCPInfo;
    }
    // all unsupported types must not be supported.
    default: return nullptr;  // safe choice for everything we can't do.
  }
}

/** @brief Let only valid ranges to be present.
 *
 * There can be a case that we have computed invalid range while computing updated ranges.
 *
 * These invalid ranges should not have CP_VALID in the status code. So, they must not
 * come into final call to setCPInfos or something.
 *
 * To achieve that, we filter these invalid ranges here.
 */
static void setInvalidCPInfosSpecialMarks(std::vector<ExtCPInfo>& cpInfos)
{
  size_t i;
  for (i = 0; i < cpInfos.size(); i++)
  {
    if (cpInfos[i].isInvalid())
    {
      cpInfos[i].fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;
    }
  }
}

/*@insertColumnRecs -  Insert value(s) into a column
 */
/***********************************************************
 * DESCRIPTION:
 *    Insert values into  columns (batchinsert)
 * PARAMETERS:
 *    colStructList - column struct list
 *    colValueList - column value list
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in inserting the value
 ***********************************************************/

int WriteEngineWrapper::insertColumnRecs(
    const TxnID& txnid, const CSCTypesList& cscColTypeList, ColStructList& colStructList,
    ColValueList& colValueList, DctnryStructList& dctnryStructList, DictStrList& dictStrList,
    std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers, RBMetaWriter* fRBMetaWriter,
    bool bFirstExtentOnThisPM, bool insertSelect, bool isAutoCommitOn, OID tableOid, bool isFirstBatchPm)
{
  int rc;
  RID* rowIdArray = NULL;
  ColTupleList curTupleList;
  Column curCol;
  ColStruct curColStruct;
  ColValueList colOldValueList;
  ColValueList colNewValueList;
  ColStructList newColStructList;
  DctnryStructList newDctnryStructList;
  HWM hwm = 0;
  HWM oldHwm = 0;
  HWM newHwm = 0;
  ColTupleList::size_type totalRow;
  ColStructList::size_type totalColumns;
  uint64_t rowsLeft = 0;
  bool newExtent = false;
  RIDList ridList;
  ColumnOp* colOp = NULL;
  ColSplitMaxMinInfoList maxMins;

  // Set tmp file suffix to modify HDFS db file
  bool useTmpSuffix = false;

  m_opType = INSERT;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    if (!bFirstExtentOnThisPM)
      useTmpSuffix = true;
  }

  unsigned i = 0;
#ifdef PROFILE
  StopWatch timer;
#endif

  // debug information for testing
  if (isDebug(DEBUG_2))
  {
    printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList);
  }

  // end

  // Convert data type and column width to write engine specific
  for (i = 0; i < colStructList.size(); i++)
    Convertor::convertColType(&colStructList[i]);

  for (const auto& colStruct : colStructList)
  {
    ColSplitMaxMinInfo tmp(colStruct.colDataType, colStruct.colWidth);
    maxMins.push_back(tmp);
  }

  uint32_t colId = 0;
  // MCOL-1675: find the smallest column width to calculate the RowID from so
  // that all HWMs will be incremented by this operation
  findSmallestColumn(colId, colStructList);

  // rc = checkValid(txnid, colStructList, colValueList, ridList);
  // if (rc != NO_ERROR)
  //   return rc;

  setTransId(txnid);
  uint16_t dbRoot, segmentNum;
  uint32_t partitionNum;
  string segFile;
  bool newFile;
  TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
  // populate colStructList with file information
  IDBDataFile* pFile = NULL;
  std::vector<DBRootExtentInfo> extentInfo;
  int currentDBrootIdx = 0;
  std::vector<BRM::CreateStripeColumnExtentsArgOut>
      extents;  // this structure valid only when isFirstOnBatchPm is true.
  std::vector<BRM::LBID_t> newExtentsStartingLbids;  // we keep column-indexed LBIDs here for **new** extents.
                                                     // It is set and valid and used only when rowsLeft > 0.

  //--------------------------------------------------------------------------
  // For first batch on this PM:
  //   o get starting extent from ExtentTracker, and allocate extent if needed
  //   o construct colStructList and dctnryStructList accordingly
  //   o save extent information in tableMetaData for future use
  // If not first batch on this PM:
  //   o construct colStructList and dctnryStructList from tableMetaData
  //--------------------------------------------------------------------------
  if (isFirstBatchPm)
  {
    currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
    extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
    dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
    partitionNum = extentInfo[currentDBrootIdx].fPartition;

    //----------------------------------------------------------------------
    // check whether this extent is the first on this PM
    //----------------------------------------------------------------------
    if (bFirstExtentOnThisPM)
    {
      std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
      BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;

      for (i = 0; i < colStructList.size(); i++)
      {
        createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
        createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
        createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
        cols.push_back(createStripeColumnExtentsArgIn);
      }

      rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum,
                                                               extents);

      if (rc != NO_ERROR)
        return rc;

      // Create column files
      ExtCPInfoList cpinfoList;

      for (i = 0; i < extents.size(); i++)
      {
        ExtCPInfo cpInfo(colStructList[i].colDataType, colStructList[i].colWidth);
        colOp = m_colOp[op(colStructList[i].fCompressionType)];
        colOp->initColumn(curCol);
        colOp->setColParam(curCol, colId, colStructList[i].colWidth, colStructList[i].colDataType,
                           colStructList[i].colType, colStructList[i].dataOid,
                           colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum);
        colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType);
        rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid,
                                 extents[i].allocSize, dbRoot, partitionNum, segmentNum, segFile, pFile,
                                 newFile);

        if (rc != NO_ERROR)
          return rc;

        cpInfo.toInvalid();

        cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;

        // mark the extents to invalid
        cpInfo.fCPInfo.firstLbid = extents[i].startLbid;
        cpinfoList.push_back(cpInfo);
        colStructList[i].fColPartition = partitionNum;
        colStructList[i].fColSegment = segmentNum;
        colStructList[i].fColDbRoot = dbRoot;
        dctnryStructList[i].fColPartition = partitionNum;
        dctnryStructList[i].fColSegment = segmentNum;
        dctnryStructList[i].fColDbRoot = dbRoot;
      }

      // mark the extents to invalid
      rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);

      if (rc != NO_ERROR)
        return rc;

      // create corresponding dictionary files
      for (i = 0; i < dctnryStructList.size(); i++)
      {
        if (dctnryStructList[i].dctnryOid > 0)
        {
          rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot,
                            partitionNum, segmentNum, dctnryStructList[i].fCompressionType);

          if (rc != NO_ERROR)
            return rc;
        }
      }
    }     // if ( bFirstExtentOnThisPM)
    else  // if (!bFirstExtentOnThisPM)
    {
      std::vector<DBRootExtentInfo> tmpExtentInfo;

      for (i = 0; i < dbRootExtentTrackers.size(); i++)
      {
        tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
        colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
        colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
        colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
        dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
        dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
        dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
      }
    }

    //----------------------------------------------------------------------
    // Save the extents info in tableMetaData
    //----------------------------------------------------------------------
    for (i = 0; i < colStructList.size(); i++)
    {
      ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
            (it->segNum == colStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = colStructList[i].fColDbRoot;
        aExt.partNum = colStructList[i].fColPartition;
        aExt.segNum = colStructList[i].fColSegment;
        aExt.compType = colStructList[i].fCompressionType;
        aExt.isDict = false;

        if (bFirstExtentOnThisPM)
        {
          aExt.hwm = extents[i].startBlkOffset;
          aExt.isNewExt = true;
        }
        else
        {
          std::vector<DBRootExtentInfo> tmpExtentInfo;
          tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
          aExt.isNewExt = false;
          aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
        }

        aExt.current = true;
        aColExtsInfo.push_back(aExt);
      }

      tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
    }

    for (i = 0; i < dctnryStructList.size(); i++)
    {
      if (dctnryStructList[i].dctnryOid > 0)
      {
        ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
        ColExtsInfo::iterator it = aColExtsInfo.begin();

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == dctnryStructList[i].fColDbRoot) &&
              (it->partNum == dctnryStructList[i].fColPartition) &&
              (it->segNum == dctnryStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = dctnryStructList[i].fColDbRoot;
          aExt.partNum = dctnryStructList[i].fColPartition;
          aExt.segNum = dctnryStructList[i].fColSegment;
          aExt.compType = dctnryStructList[i].fCompressionType;
          aExt.isDict = true;
          aColExtsInfo.push_back(aExt);
        }

        tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
      }
    }

  }     // if (isFirstBatchPm)
  else  // get the extent info from tableMetaData
  {
    ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
    ColExtsInfo::iterator it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if (it->current)
        break;

      it++;
    }

    if (it == aColExtsInfo.end())
      return 1;

    for (i = 0; i < colStructList.size(); i++)
    {
      colStructList[i].fColPartition = it->partNum;
      colStructList[i].fColSegment = it->segNum;
      colStructList[i].fColDbRoot = it->dbRoot;
      dctnryStructList[i].fColPartition = it->partNum;
      dctnryStructList[i].fColSegment = it->segNum;
      dctnryStructList[i].fColDbRoot = it->dbRoot;
    }
  }

  curTupleList = static_cast<ColTupleList>(colValueList[0]);
  totalRow = curTupleList.size();
  totalColumns = colStructList.size();
  rowIdArray = new RID[totalRow];
  // use scoped_array to ensure ptr deletion regardless of where we return
  boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
  memset(rowIdArray, 0, (sizeof(RID) * totalRow));

  //--------------------------------------------------------------------------
  // allocate row id(s)
  //--------------------------------------------------------------------------
  curColStruct = colStructList[colId];
  colOp = m_colOp[op(curColStruct.fCompressionType)];

  colOp->initColumn(curCol);

  // Get the correct segment, partition, column file
  vector<ExtentInfo> colExtentInfo;   // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> dictExtentInfo;  // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> fileInfo;
  dbRoot = curColStruct.fColDbRoot;
  // use the first column to calculate row id
  ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
  ColExtsInfo::iterator it = aColExtsInfo.begin();

  while (it != aColExtsInfo.end())
  {
    if ((it->dbRoot == colStructList[colId].fColDbRoot) &&
        (it->partNum == colStructList[colId].fColPartition) &&
        (it->segNum == colStructList[colId].fColSegment) && it->current)
    {
      break;
    }
    it++;
  }

  if (it != aColExtsInfo.end())
  {
    hwm = it->hwm;
  }

  oldHwm = hwm;  // Save this info for rollback
  // need to pass real dbRoot, partition, and segment to setColParam
  colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                     curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot,
                     curColStruct.fColPartition, curColStruct.fColSegment);
  colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);
  rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix);  // @bug 5572 HDFS tmp file

  if (rc != NO_ERROR)
  {
    return rc;
  }

  // get hwm first
  // @bug 286 : fix for bug 286 - correct the typo in getHWM
  // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));

  Column newCol;

#ifdef PROFILE
  timer.start("allocRowId");
#endif
  newColStructList = colStructList;
  newDctnryStructList = dctnryStructList;
  bool bUseStartExtent = true;

  if (idbdatafile::IDBPolicy::useHdfs())
    insertSelect = true;

  rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
                         rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
                         dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm,
                         &newExtentsStartingLbids);

  if (rc != NO_ERROR)  // Clean up is already done
    return rc;

#ifdef PROFILE
  timer.stop("allocRowId");
#endif

  //--------------------------------------------------------------------------
  // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
  // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
  //--------------------------------------------------------------------------
  // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
  if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) &&
      (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
  {
    for (unsigned k = 0; k < colStructList.size(); k++)
    {
      // Skip the selected column
      if (k == colId)
        continue;
      Column expandCol;
      colOp = m_colOp[op(colStructList[k].fCompressionType)];
      colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType,
                         colStructList[k].colType, colStructList[k].dataOid,
                         colStructList[k].fCompressionType, colStructList[k].fColDbRoot,
                         colStructList[k].fColPartition, colStructList[k].fColSegment);
      colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType);
      rc = colOp->openColumnFile(expandCol, segFile, true);  // @bug 5572 HDFS tmp file

      if (rc == NO_ERROR)
      {
        if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
        {
          rc = colOp->expandAbbrevExtent(expandCol);
        }
      }

      if (rc != NO_ERROR)
      {
        return rc;
      }

      colOp->clearColumn(expandCol);  // closes the file (if uncompressed)
    }
  }

  //--------------------------------------------------------------------------
  // Tokenize data if needed
  //--------------------------------------------------------------------------
  if (insertSelect && isAutoCommitOn)
    BRMWrapper::setUseVb(false);
  else
    BRMWrapper::setUseVb(true);

  dictStr::iterator dctStr_iter;
  ColTupleList::iterator col_iter;

  for (i = 0; i < colStructList.size(); i++)
  {
    if (colStructList[i].tokenFlag)
    {
      dctStr_iter = dictStrList[i].begin();
      col_iter = colValueList[i].begin();
      Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
      rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot,
                              dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment,
                              useTmpSuffix);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
      {
        // cout << "Error opening dctnry file " << dctnryStructList[i].dctnryOid << endl;
        return rc;
      }

#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
      datatypes::Charset cset(dctnryStructList[i].fCharsetNumber);
#endif
      for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
      {
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
        int64_t strPrefix;
#endif
        if (dctStr_iter->isNull())
        {
          Token nullToken;
          col_iter->data = nullToken;
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
          strPrefix = (int64_t)joblist::UBIGINTNULL;  // the string prefixes are signed long ints.
#endif
        }
        else
        {
#ifdef PROFILE
          timer.start("tokenize");
#endif
          DctnryTuple dctTuple;
          dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
          dctTuple.sigSize = dctStr_iter->length();
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
          strPrefix = encodeStringPrefix(dctTuple.sigValue, dctTuple.sigSize, cset);
#endif
          dctTuple.isNull = false;
          rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);

          if (rc != NO_ERROR)
          {
            dctnry->closeDctnry();
            return rc;
          }

#ifdef PROFILE
          timer.stop("tokenize");
#endif
          col_iter->data = dctTuple.token;
        }

#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
        maxMins[i].fSplitMaxMinInfo[0].addStringPrefix(strPrefix);
#endif
        dctStr_iter++;
        col_iter++;
      }

      // close dictionary files
      rc = dctnry->closeDctnry(false);

      if (rc != NO_ERROR)
        return rc;

      if (newExtent)
      {
        //@Bug 4854 back up hwm chunk for the file to be modified
        if (fRBMetaWriter)
          fRBMetaWriter->backupDctnryHWMChunk(
              newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
              newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);

        rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
                                newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment,
                                false);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          return rc;

        for (uint32_t rows = 0; rows < rowsLeft; rows++)
        {
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
          int64_t strPrefix;
#endif
          if (dctStr_iter->isNull())
          {
            Token nullToken;
            col_iter->data = nullToken;
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
            strPrefix = joblist::UBIGINTNULL;  // string prefixes are signed long ints.
#endif
          }
          else
          {
#ifdef PROFILE
            timer.start("tokenize");
#endif
            DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
            dctTuple.sigSize = dctStr_iter->length();
#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
            strPrefix = encodeStringPrefix_check_null(dctTuple.sigValue, dctTuple.sigSize, cset);
#endif
            dctTuple.isNull = false;
            rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);

            if (rc != NO_ERROR)
            {
              dctnry->closeDctnry();
              return rc;
            }

#ifdef PROFILE
            timer.stop("tokenize");
#endif
            col_iter->data = dctTuple.token;
          }

#if defined(XXX_WRITEENGINE_TOKENS_RANGES_XXX)
          maxMins[i].fSplitMaxMinInfo[1].addStringPrefix(strPrefix);
#endif
          dctStr_iter++;
          col_iter++;
        }

        // close dictionary files
        rc = dctnry->closeDctnry(false);

        if (rc != NO_ERROR)
          return rc;
      }
    }
  }

  if (insertSelect && isAutoCommitOn)
    BRMWrapper::setUseVb(false);
  else
    BRMWrapper::setUseVb(true);

  //--------------------------------------------------------------------------
  // Update column info structure @Bug 1862 set hwm, and
  // Prepare ValueList for new extent (if applicable)
  //--------------------------------------------------------------------------
  //@Bug 2205 Check whether all rows go to the new extent
  RID lastRid = 0;
  RID lastRidNew = 0;

  if (totalRow - rowsLeft > 0)
  {
    lastRid = rowIdArray[totalRow - rowsLeft - 1];
    lastRidNew = rowIdArray[totalRow - 1];
  }
  else
  {
    lastRid = 0;
    lastRidNew = rowIdArray[totalRow - 1];
  }

  // if a new extent is created, all the columns in this table should have their own new extent
  // First column already processed

  //@Bug 1701. Close the file (if uncompressed)
  m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
  // Update hwm to set them in the end
  bool succFlag = false;
  unsigned colWidth = 0;
  int curFbo = 0, curBio;

  for (i = 0; i < totalColumns; i++)
  {
    // shoud be obtained from saved hwm
    aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
    it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
          (it->segNum == colStructList[i].fColSegment) && it->current)
        break;

      it++;
    }

    if (it != aColExtsInfo.end())  // update hwm info
    {
      oldHwm = it->hwm;
    }

    // save hwm for the old extent
    colWidth = colStructList[i].colWidth;
    succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

    if (succFlag)
    {
      if ((HWM)curFbo >= oldHwm)
      {
        it->hwm = (HWM)curFbo;
      }

      //@Bug 4947. set current to false for old extent.
      if (newExtent)
      {
        it->current = false;
      }
    }
    else
      return ERR_INVALID_PARAM;

    // update hwm for the new extent
    if (newExtent)
    {
      it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == newColStructList[i].fColDbRoot) &&
            (it->partNum == newColStructList[i].fColPartition) &&
            (it->segNum == newColStructList[i].fColSegment) && it->current)
          break;

        it++;
      }

      succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      if (succFlag)
      {
        if (it != aColExtsInfo.end())
        {
          it->hwm = (HWM)curFbo;
          // cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
          it->current = true;
        }
      }
      else
        return ERR_INVALID_PARAM;
    }

    tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
  }

  //--------------------------------------------------------------------------
  // Prepare the valuelist for the new extent
  //--------------------------------------------------------------------------
  ColTupleList colTupleList;
  ColTupleList newColTupleList;
  ColTupleList firstPartTupleList;

  for (unsigned i = 0; i < totalColumns; i++)
  {
    colTupleList = static_cast<ColTupleList>(colValueList[i]);

    for (uint64_t j = rowsLeft; j > 0; j--)
    {
      newColTupleList.push_back(colTupleList[totalRow - j]);
    }

    colNewValueList.push_back(newColTupleList);
    newColTupleList.clear();

    // update the oldvalue list for the old extent
    for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
    {
      firstPartTupleList.push_back(colTupleList[j]);
    }

    colOldValueList.push_back(firstPartTupleList);
    firstPartTupleList.clear();
  }

  // end of allocate row id

#ifdef PROFILE
  timer.start("writeColumnRec");
#endif

  if (rc == NO_ERROR)
  {
    //----------------------------------------------------------------------
    // Mark extents invalid
    //----------------------------------------------------------------------
    bool successFlag = true;
    unsigned width = 0;
    int curFbo = 0, curBio, lastFbo = -1;

    if (isFirstBatchPm && (totalRow == rowsLeft))
    {
      // in this particular case we already marked extents as invalid above.
    }
    else
    {
      int firstHalfCount = totalRow - rowsLeft;
      for (unsigned i = 0; i < colStructList.size(); i++)
      {
        colOp = m_colOp[op(colStructList[i].fCompressionType)];
        width = colStructList[i].colWidth;
        if (firstHalfCount)
        {
          ExtCPInfo* cpInfoP =
              getCPInfoToUpdateForUpdatableType(colStructList[i], &maxMins[i].fSplitMaxMinInfo[0], m_opType);
          RID thisRid = rowsLeft ? lastRid : lastRidNew;
          successFlag = colOp->calculateRowId(thisRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

          if (successFlag)
          {
            if (curFbo != lastFbo)
            {
              RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo, cpInfoP));
            }
          }
          maxMins[i].fSplitMaxMinInfoPtrs[0] = cpInfoP;
        }
        if (rowsLeft)
        {
          ExtCPInfo* cpInfoP =
              getCPInfoToUpdateForUpdatableType(colStructList[i], &maxMins[i].fSplitMaxMinInfo[1], m_opType);
          if (cpInfoP)
          {
            RETURN_ON_ERROR(GetLBIDRange(newExtentsStartingLbids[i], colStructList[i], *cpInfoP));
          }

          maxMins[i].fSplitMaxMinInfoPtrs[1] = cpInfoP;
        }
      }
    }

    markTxnExtentsAsInvalid(txnid);

    //----------------------------------------------------------------------
    // Write row(s) to database file(s)
    //----------------------------------------------------------------------
    std::vector<ExtCPInfo> cpinfoList;
    for (auto& splitCPInfo : maxMins)
    {
      for (i = 0; i < 2; i++)
      {
        ExtCPInfo* cpInfo = splitCPInfo.fSplitMaxMinInfoPtrs[i];
        if (cpInfo)
        {
          cpinfoList.push_back(*cpInfo);
          cpinfoList[cpinfoList.size() - 1].fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;
        }
      }
    }
    rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
    if (rc != NO_ERROR)
    {
      return rc;
    }
    rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList,
                        colNewValueList, tableOid, useTmpSuffix, true, &maxMins);  // @bug 5572 HDFS tmp file

    if (rc == NO_ERROR)
    {
      if (dctnryStructList.size() > 0)
      {
        vector<BRM::OID_t> oids{static_cast<int32_t>(tableOid)};
        for (const DctnryStruct& dctnryStruct : dctnryStructList)
        {
          oids.push_back(dctnryStruct.dctnryOid);
        }

        rc = flushOIDsFromCache(oids);

        if (rc != 0)
          rc = ERR_BLKCACHE_FLUSH_LIST;  // translate to WE error
      }

      if (rc == NO_ERROR)
      {
        int index = 0;
        for (auto& splitCPInfo : maxMins)
        {
          for (i = 0; i < 2; i++)
          {
            ExtCPInfo* cpInfo = splitCPInfo.fSplitMaxMinInfoPtrs[i];
            if (cpInfo)
            {
              cpinfoList[index] = *cpInfo;
              cpinfoList[index].fCPInfo.seqNum++;
              index++;
            }
          }
        }
        setInvalidCPInfosSpecialMarks(cpinfoList);
        rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);
      }
    }
  }

  return rc;
}

int WriteEngineWrapper::insertColumnRecsBinary(
    const TxnID& txnid, ColStructList& colStructList, std::vector<uint64_t>& colValueList,
    DctnryStructList& dctnryStructList, DictStrList& dictStrList,
    std::vector<boost::shared_ptr<DBRootExtentTracker> >& dbRootExtentTrackers, RBMetaWriter* fRBMetaWriter,
    bool bFirstExtentOnThisPM, bool insertSelect, bool isAutoCommitOn, OID tableOid, bool isFirstBatchPm)
{
  int rc;
  RID* rowIdArray = NULL;
  Column curCol;
  ColStruct curColStruct;
  ColStructList newColStructList;
  std::vector<uint64_t> colNewValueList;
  DctnryStructList newDctnryStructList;
  HWM hwm = 0;
  HWM oldHwm = 0;
  HWM newHwm = 0;
  size_t totalRow;
  ColStructList::size_type totalColumns;
  uint64_t rowsLeft = 0;
  bool newExtent = false;
  RIDList ridList;
  ColumnOp* colOp = NULL;
  std::vector<BRM::LBID_t> dictLbids;

  // Set tmp file suffix to modify HDFS db file
  bool useTmpSuffix = false;

  m_opType = INSERT;

  if (idbdatafile::IDBPolicy::useHdfs())
  {
    if (!bFirstExtentOnThisPM)
      useTmpSuffix = true;
  }

  unsigned i = 0;
#ifdef PROFILE
  StopWatch timer;
#endif

  // Convert data type and column width to write engine specific
  for (i = 0; i < colStructList.size(); i++)
    Convertor::convertColType(&colStructList[i]);

  uint32_t colId = 0;
  // MCOL-1675: find the smallest column width to calculate the RowID from so
  // that all HWMs will be incremented by this operation
  findSmallestColumn(colId, colStructList);

  // rc = checkValid(txnid, colStructList, colValueList, ridList);
  // if (rc != NO_ERROR)
  //   return rc;

  setTransId(txnid);
  uint16_t dbRoot, segmentNum;
  uint32_t partitionNum;
  string segFile;
  bool newFile;
  TableMetaData* tableMetaData = TableMetaData::makeTableMetaData(tableOid);
  // populate colStructList with file information
  IDBDataFile* pFile = NULL;
  std::vector<DBRootExtentInfo> extentInfo;
  int currentDBrootIdx = 0;
  std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;

  //--------------------------------------------------------------------------
  // For first batch on this PM:
  //   o get starting extent from ExtentTracker, and allocate extent if needed
  //   o construct colStructList and dctnryStructList accordingly
  //   o save extent information in tableMetaData for future use
  // If not first batch on this PM:
  //   o construct colStructList and dctnryStructList from tableMetaData
  //--------------------------------------------------------------------------
  if (isFirstBatchPm)
  {
    currentDBrootIdx = dbRootExtentTrackers[colId]->getCurrentDBRootIdx();
    extentInfo = dbRootExtentTrackers[colId]->getDBRootExtentList();
    dbRoot = extentInfo[currentDBrootIdx].fDbRoot;
    partitionNum = extentInfo[currentDBrootIdx].fPartition;

    //----------------------------------------------------------------------
    // check whether this extent is the first on this PM
    //----------------------------------------------------------------------
    if (bFirstExtentOnThisPM)
    {
      // cout << "bFirstExtentOnThisPM is " << bFirstExtentOnThisPM << endl;
      std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
      BRM::CreateStripeColumnExtentsArgIn createStripeColumnExtentsArgIn;

      for (i = 0; i < colStructList.size(); i++)
      {
        createStripeColumnExtentsArgIn.oid = colStructList[i].dataOid;
        createStripeColumnExtentsArgIn.width = colStructList[i].colWidth;
        createStripeColumnExtentsArgIn.colDataType = colStructList[i].colDataType;
        cols.push_back(createStripeColumnExtentsArgIn);
      }

      rc = BRMWrapper::getInstance()->allocateStripeColExtents(cols, dbRoot, partitionNum, segmentNum,
                                                               extents);

      if (rc != NO_ERROR)
        return rc;

      // Create column files
      ExtCPInfoList cpinfoList;

      for (i = 0; i < extents.size(); i++)
      {
        ExtCPInfo cpInfo(colStructList[i].colDataType, colStructList[i].colWidth);
        colOp = m_colOp[op(colStructList[i].fCompressionType)];
        colOp->initColumn(curCol);
        colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
                           colStructList[i].colType, colStructList[i].dataOid,
                           colStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum);
        colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType);
        rc = colOp->extendColumn(curCol, false, extents[i].startBlkOffset, extents[i].startLbid,
                                 extents[i].allocSize, dbRoot, partitionNum, segmentNum, segFile, pFile,
                                 newFile);

        if (rc != NO_ERROR)
          return rc;

        cpInfo.toInvalid();

        cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;

        // mark the extents to invalid
        cpInfo.fCPInfo.firstLbid = extents[i].startLbid;
        cpinfoList.push_back(cpInfo);
        colStructList[i].fColPartition = partitionNum;
        colStructList[i].fColSegment = segmentNum;
        colStructList[i].fColDbRoot = dbRoot;
        dctnryStructList[i].fColPartition = partitionNum;
        dctnryStructList[i].fColSegment = segmentNum;
        dctnryStructList[i].fColDbRoot = dbRoot;
      }

      // mark the extents to invalid
      rc = BRMWrapper::getInstance()->setExtentsMaxMin(cpinfoList);

      if (rc != NO_ERROR)
        return rc;

      // create corresponding dictionary files
      for (i = 0; i < dctnryStructList.size(); i++)
      {
        if (dctnryStructList[i].dctnryOid > 0)
        {
          rc = createDctnry(txnid, dctnryStructList[i].dctnryOid, dctnryStructList[i].colWidth, dbRoot,
                            partitionNum, segmentNum, dctnryStructList[i].fCompressionType);

          if (rc != NO_ERROR)
            return rc;
        }
      }
    }     // if ( bFirstExtentOnThisPM)
    else  // if (!bFirstExtentOnThisPM)
    {
      std::vector<DBRootExtentInfo> tmpExtentInfo;

      for (i = 0; i < dbRootExtentTrackers.size(); i++)
      {
        tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
        colStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
        colStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
        colStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
        // cout << "Load from dbrootExtenttracker oid:dbroot:part:seg = " <<colStructList[i].dataOid<<":"
        //<<colStructList[i].fColDbRoot<<":"<<colStructList[i].fColPartition<<":"<<colStructList[i].fColSegment<<endl;
        dctnryStructList[i].fColPartition = tmpExtentInfo[currentDBrootIdx].fPartition;
        dctnryStructList[i].fColSegment = tmpExtentInfo[currentDBrootIdx].fSegment;
        dctnryStructList[i].fColDbRoot = tmpExtentInfo[currentDBrootIdx].fDbRoot;
      }
    }

    //----------------------------------------------------------------------
    // Save the extents info in tableMetaData
    //----------------------------------------------------------------------
    for (i = 0; i < colStructList.size(); i++)
    {
      ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
            (it->segNum == colStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = colStructList[i].fColDbRoot;
        aExt.partNum = colStructList[i].fColPartition;
        aExt.segNum = colStructList[i].fColSegment;
        aExt.compType = colStructList[i].fCompressionType;
        aExt.isDict = false;

        if (bFirstExtentOnThisPM)
        {
          aExt.hwm = extents[i].startBlkOffset;
          aExt.isNewExt = true;
          // cout << "adding a ext to metadata" << endl;
        }
        else
        {
          std::vector<DBRootExtentInfo> tmpExtentInfo;
          tmpExtentInfo = dbRootExtentTrackers[i]->getDBRootExtentList();
          aExt.isNewExt = false;
          aExt.hwm = tmpExtentInfo[currentDBrootIdx].fLocalHwm;
          // cout << "oid " << colStructList[i].dataOid << " gets hwm " << aExt.hwm << endl;
        }

        aExt.current = true;
        aColExtsInfo.push_back(aExt);
        // cout << "get from extentinfo oid:hwm = " << colStructList[i].dataOid << ":" << aExt.hwm << endl;
      }

      tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
    }

    for (i = 0; i < dctnryStructList.size(); i++)
    {
      if (dctnryStructList[i].dctnryOid > 0)
      {
        ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
        ColExtsInfo::iterator it = aColExtsInfo.begin();

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == dctnryStructList[i].fColDbRoot) &&
              (it->partNum == dctnryStructList[i].fColPartition) &&
              (it->segNum == dctnryStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = dctnryStructList[i].fColDbRoot;
          aExt.partNum = dctnryStructList[i].fColPartition;
          aExt.segNum = dctnryStructList[i].fColSegment;
          aExt.compType = dctnryStructList[i].fCompressionType;
          aExt.isDict = true;
          aColExtsInfo.push_back(aExt);
        }

        tableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
      }
    }

  }     // if (isFirstBatchPm)
  else  // get the extent info from tableMetaData
  {
    ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
    ColExtsInfo::iterator it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if (it->current)
        break;

      it++;
    }

    if (it == aColExtsInfo.end())
      return 1;

    for (i = 0; i < colStructList.size(); i++)
    {
      colStructList[i].fColPartition = it->partNum;
      colStructList[i].fColSegment = it->segNum;
      colStructList[i].fColDbRoot = it->dbRoot;
      dctnryStructList[i].fColPartition = it->partNum;
      dctnryStructList[i].fColSegment = it->segNum;
      dctnryStructList[i].fColDbRoot = it->dbRoot;
    }
  }

  totalColumns = colStructList.size();
  totalRow = colValueList.size() / totalColumns;
  rowIdArray = new RID[totalRow];
  // use scoped_array to ensure ptr deletion regardless of where we return
  boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
  memset(rowIdArray, 0, (sizeof(RID) * totalRow));

  //--------------------------------------------------------------------------
  // allocate row id(s)
  //--------------------------------------------------------------------------

  curColStruct = colStructList[colId];

  colOp = m_colOp[op(curColStruct.fCompressionType)];

  colOp->initColumn(curCol);

  // Get the correct segment, partition, column file
  vector<ExtentInfo> colExtentInfo;   // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> dictExtentInfo;  // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> fileInfo;
  dbRoot = curColStruct.fColDbRoot;
  // use the smallest column to calculate row id
  ColExtsInfo aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[colId].dataOid);
  ColExtsInfo::iterator it = aColExtsInfo.begin();

  while (it != aColExtsInfo.end())
  {
    if ((it->dbRoot == colStructList[colId].fColDbRoot) &&
        (it->partNum == colStructList[colId].fColPartition) &&
        (it->segNum == colStructList[colId].fColSegment) && it->current)
      break;

    it++;
  }

  if (it != aColExtsInfo.end())
  {
    hwm = it->hwm;
    // cout << "Got from colextinfo hwm for oid " << colStructList[colId].dataOid << " is " << hwm << " and
    // seg is " << colStructList[colId].fColSegment << endl;
  }

  oldHwm = hwm;  // Save this info for rollback
  // need to pass real dbRoot, partition, and segment to setColParam
  colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                     curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot,
                     curColStruct.fColPartition, curColStruct.fColSegment);
  colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);
  rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix);  // @bug 5572 HDFS tmp file

  if (rc != NO_ERROR)
  {
    return rc;
  }

  // get hwm first
  // @bug 286 : fix for bug 286 - correct the typo in getHWM
  // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));

  Column newCol;

#ifdef PROFILE
  timer.start("allocRowId");
#endif
  newColStructList = colStructList;
  newDctnryStructList = dctnryStructList;
  bool bUseStartExtent = true;

  if (idbdatafile::IDBPolicy::useHdfs())
    insertSelect = true;

  rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
                         rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
                         dbRootExtentTrackers, insertSelect, true, tableOid, isFirstBatchPm, NULL);

  // cout << "after allocrowid, total row = " <<totalRow << " newExtent is " << newExtent << endl;
  // cout << "column oid " << curColStruct.dataOid << " has hwm:newHwm = " << hwm <<":" << newHwm<< endl;
  if (rc != NO_ERROR)  // Clean up is already done
    return rc;

#ifdef PROFILE
  timer.stop("allocRowId");
#endif

  //--------------------------------------------------------------------------
  // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
  // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
  //--------------------------------------------------------------------------
  // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
  if ((curCol.dataFile.fPartition == 0) && (curCol.dataFile.fSegment == 0) && ((totalRow - rowsLeft) > 0) &&
      (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
  {
    for (size_t k = 0; k < colStructList.size(); k++)
    {
      // Skip the selected column
      if (k == (size_t)colId)
        continue;

      Column expandCol;
      colOp = m_colOp[op(colStructList[k].fCompressionType)];
      // Shouldn't we change 0 to colId here?
      colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType,
                         colStructList[k].colType, colStructList[k].dataOid,
                         colStructList[k].fCompressionType, colStructList[k].fColDbRoot,
                         colStructList[k].fColPartition, colStructList[k].fColSegment);
      colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType);
      rc = colOp->openColumnFile(expandCol, segFile, true);  // @bug 5572 HDFS tmp file

      if (rc == NO_ERROR)
      {
        if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
        {
          rc = colOp->expandAbbrevExtent(expandCol);
        }
      }

      if (rc != NO_ERROR)
      {
        return rc;
      }

      colOp->closeColumnFile(expandCol);
    }
  }

  //--------------------------------------------------------------------------
  // Tokenize data if needed
  //--------------------------------------------------------------------------
  if (insertSelect && isAutoCommitOn)
    BRMWrapper::setUseVb(false);
  else
    BRMWrapper::setUseVb(true);

  dictStr::iterator dctStr_iter;
  uint64_t* colValPtr;
  size_t rowsPerColumn = colValueList.size() / colStructList.size();

  for (i = 0; i < colStructList.size(); i++)
  {
    if (colStructList[i].tokenFlag)
    {
      dctStr_iter = dictStrList[i].begin();
      Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];
      rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot,
                              dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment,
                              useTmpSuffix);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
      {
        return rc;
      }

      for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
      {
        colValPtr = &colValueList[(i * rowsPerColumn) + rows];

        if (dctStr_iter->isNull())
        {
          Token nullToken;
          memcpy(colValPtr, &nullToken, 8);
        }
        else
        {
#ifdef PROFILE
          timer.start("tokenize");
#endif
          DctnryTuple dctTuple;
          dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
          dctTuple.sigSize = dctStr_iter->length();
          dctTuple.isNull = false;
          rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);

          if (rc != NO_ERROR)
          {
            dctnry->closeDctnry();
            return rc;
          }

#ifdef PROFILE
          timer.stop("tokenize");
#endif
          memcpy(colValPtr, &dctTuple.token, 8);
          dictLbids.push_back(dctTuple.token.fbo);
        }

        dctStr_iter++;
      }

      // close dictionary files
      rc = dctnry->closeDctnry(false);

      if (rc != NO_ERROR)
        return rc;

      if (newExtent)
      {
        //@Bug 4854 back up hwm chunk for the file to be modified
        if (fRBMetaWriter)
          fRBMetaWriter->backupDctnryHWMChunk(
              newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
              newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment);

        rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
                                newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment,
                                false);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          return rc;

        for (uint32_t rows = 0; rows < rowsLeft; rows++)
        {
          colValPtr = &colValueList[(i * rowsPerColumn) + rows];

          if (dctStr_iter->isNull())
          {
            Token nullToken;
            memcpy(colValPtr, &nullToken, 8);
          }
          else
          {
#ifdef PROFILE
            timer.start("tokenize");
#endif
            DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
            dctTuple.sigSize = dctStr_iter->length();
            dctTuple.isNull = false;
            rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);

            if (rc != NO_ERROR)
            {
              dctnry->closeDctnry();
              return rc;
            }

#ifdef PROFILE
            timer.stop("tokenize");
#endif
            memcpy(colValPtr, &dctTuple.token, 8);
            dictLbids.push_back(dctTuple.token.fbo);
          }

          dctStr_iter++;
        }

        // close dictionary files
        rc = dctnry->closeDctnry(false);

        if (rc != NO_ERROR)
          return rc;
      }
    }
  }

  if (insertSelect && isAutoCommitOn)
    BRMWrapper::setUseVb(false);
  else
    BRMWrapper::setUseVb(true);

  //--------------------------------------------------------------------------
  // Update column info structure @Bug 1862 set hwm, and
  // Prepare ValueList for new extent (if applicable)
  //--------------------------------------------------------------------------
  //@Bug 2205 Check whether all rows go to the new extent
  RID lastRid = 0;
  RID lastRidNew = 0;

  if (totalRow - rowsLeft > 0)
  {
    lastRid = rowIdArray[totalRow - rowsLeft - 1];
    lastRidNew = rowIdArray[totalRow - 1];
  }
  else
  {
    lastRid = 0;
    lastRidNew = rowIdArray[totalRow - 1];
  }

  // cout << "rowid allocated is "  << lastRid << endl;
  // if a new extent is created, all the columns in this table should have their own new extent
  // First column already processed

  //@Bug 1701. Close the file (if uncompressed)
  m_colOp[op(curCol.compressionType)]->closeColumnFile(curCol);
  // cout << "Saving hwm info for new ext batch" << endl;
  // Update hwm to set them in the end
  bool succFlag = false;
  unsigned colWidth = 0;
  int curFbo = 0, curBio;

  for (i = 0; i < totalColumns; i++)
  {
    // shoud be obtained from saved hwm
    aColExtsInfo = tableMetaData->getColExtsInfo(colStructList[i].dataOid);
    it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
          (it->segNum == colStructList[i].fColSegment) && it->current)
        break;

      it++;
    }

    if (it != aColExtsInfo.end())  // update hwm info
    {
      oldHwm = it->hwm;

      // save hwm for the old extent
      colWidth = colStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      // cout << "insertcolumnrec   oid:rid:fbo:oldhwm = " << colStructList[i].dataOid << ":" << lastRid <<
      // ":" << curFbo << ":" << oldHwm << endl;
      if (succFlag)
      {
        if ((HWM)curFbo >= oldHwm)
        {
          it->hwm = (HWM)curFbo;
        }

        //@Bug 4947. set current to false for old extent.
        if (newExtent)
        {
          it->current = false;
        }

        // cout << "updated old ext info for oid " << colStructList[i].dataOid << "
        // dbroot:part:seg:hwm:current = "
        //<< it->dbRoot<<":"<<it->partNum<<":"<<it->segNum<<":"<<it->hwm<<":"<< it->current<< " and newExtent
        //is " << newExtent << endl;
      }
      else
        return ERR_INVALID_PARAM;
    }

    // update hwm for the new extent
    if (newExtent)
    {
      it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == newColStructList[i].fColDbRoot) &&
            (it->partNum == newColStructList[i].fColPartition) &&
            (it->segNum == newColStructList[i].fColSegment) && it->current)
          break;

        it++;
      }

      colWidth = newColStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      if (succFlag)
      {
        if (it != aColExtsInfo.end())
        {
          it->hwm = (HWM)curFbo;
          // cout << "setting hwm to " << (int)curFbo <<" for seg " <<it->segNum << endl;
          it->current = true;
        }
      }
      else
        return ERR_INVALID_PARAM;
    }

    tableMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
  }

  //--------------------------------------------------------------------------
  // Prepare the valuelist for the new extent
  //--------------------------------------------------------------------------

  for (unsigned i = 1; i <= totalColumns; i++)
  {
    // Copy values to second value list
    for (uint64_t j = rowsLeft; j > 0; j--)
    {
      colNewValueList.push_back(colValueList[(totalRow * i) - j]);
    }
  }

  // end of allocate row id

#ifdef PROFILE
  timer.start("writeColumnRec");
#endif
  // cout << "Writing column record" << endl;

  if (rc == NO_ERROR)
  {
    //----------------------------------------------------------------------
    // Mark extents invalid
    //----------------------------------------------------------------------
    bool successFlag = true;
    unsigned width = 0;
    int curFbo = 0, curBio, lastFbo = -1;

    if (isFirstBatchPm && (totalRow == rowsLeft))
    {
    }
    else
    {
      for (unsigned i = 0; i < colStructList.size(); i++)
      {
        colOp = m_colOp[op(colStructList[i].fCompressionType)];
        width = colStructList[i].colWidth;
        successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

        if (successFlag)
        {
          if (curFbo != lastFbo)
          {
            RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo));
          }
        }
        else
          return ERR_INVALID_PARAM;
      }
    }

    // If we create a new extent for this batch
    for (unsigned i = 0; i < newColStructList.size(); i++)
    {
      colOp = m_colOp[op(newColStructList[i].fCompressionType)];
      width = newColStructList[i].colWidth;
      successFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / width, width, curFbo, curBio);

      if (successFlag)
      {
        if (curFbo != lastFbo)
        {
          RETURN_ON_ERROR(AddLBIDtoList(txnid, newColStructList[i], curFbo));
        }
      }
      else
        return ERR_INVALID_PARAM;
    }

    markTxnExtentsAsInvalid(txnid);

    //----------------------------------------------------------------------
    // Write row(s) to database file(s)
    //----------------------------------------------------------------------
    bool versioning = !(isAutoCommitOn && insertSelect);
    AddDictToList(txnid, dictLbids);
    rc =
        writeColumnRecBinary(txnid, colStructList, colValueList, rowIdArray, newColStructList,
                             colNewValueList, tableOid, useTmpSuffix, versioning);  // @bug 5572 HDFS tmp file
  }

  return rc;
}

int WriteEngineWrapper::insertColumnRec_SYS(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                            ColStructList& colStructList, ColValueList& colValueList,
                                            DctnryStructList& dctnryStructList, DictStrList& dictStrList,
                                            const int32_t tableOid)
{
  int rc;
  RID* rowIdArray = NULL;
  ColTupleList curTupleList;
  Column curCol;
  ColStruct curColStruct;
  ColValueList colOldValueList;
  ColValueList colNewValueList;
  ColStructList newColStructList;
  DctnryStructList newDctnryStructList;
  HWM hwm = 0;
  HWM newHwm = 0;
  HWM oldHwm = 0;
  ColTupleList::size_type totalRow;
  ColStructList::size_type totalColumns;
  uint64_t rowsLeft = 0;
  bool newExtent = false;
  RIDList ridList;
  ColumnOp* colOp = NULL;
  uint32_t i = 0;
#ifdef PROFILE
  StopWatch timer;
#endif

  m_opType = INSERT;

  // debug information for testing
  if (isDebug(DEBUG_2))
  {
    printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList);
  }

  // end

  // Convert data type and column width to write engine specific
  for (i = 0; i < colStructList.size(); i++)
    Convertor::convertColType(&colStructList[i]);

  rc = checkValid(txnid, colStructList, colValueList, ridList);

  if (rc != NO_ERROR)
    return rc;

  setTransId(txnid);

  curTupleList = static_cast<ColTupleList>(colValueList[0]);
  totalRow = curTupleList.size();
  totalColumns = colStructList.size();
  rowIdArray = new RID[totalRow];
  // use scoped_array to ensure ptr deletion regardless of where we return
  boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
  memset(rowIdArray, 0, (sizeof(RID) * totalRow));

  // allocate row id(s)
  curColStruct = colStructList[0];
  colOp = m_colOp[op(curColStruct.fCompressionType)];

  colOp->initColumn(curCol);

  // Get the correct segment, partition, column file
  uint16_t dbRoot, segmentNum;
  uint32_t partitionNum;
  vector<ExtentInfo> colExtentInfo;   // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> dictExtentInfo;  // Save those empty extents in case of failure to rollback
  vector<ExtentInfo> fileInfo;
  ExtentInfo info;
  // Don't search for empty space, always append to the end. May need to revisit this part
  dbRoot = curColStruct.fColDbRoot;
  int extState;
  bool extFound;
  RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(curColStruct.dataOid, dbRoot, partitionNum,
                                                               segmentNum, hwm, extState, extFound));

  for (i = 0; i < colStructList.size(); i++)
  {
    colStructList[i].fColPartition = partitionNum;
    colStructList[i].fColSegment = segmentNum;
    colStructList[i].fColDbRoot = dbRoot;
  }

  oldHwm = hwm;  // Save this info for rollback
  // need to pass real dbRoot, partition, and segment to setColParam
  colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                     curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum);
  colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);
  string segFile;
  rc = colOp->openColumnFile(curCol, segFile, false);  // @bug 5572 HDFS tmp file

  if (rc != NO_ERROR)
  {
    return rc;
  }

  // get hwm first
  // @bug 286 : fix for bug 286 - correct the typo in getHWM
  // RETURN_ON_ERROR(BRMWrapper::getInstance()->getHWM(curColStruct.dataOid, hwm));

  //...Note that we are casting totalRow to int to be in sync with
  //...allocRowId().  So we are assuming that totalRow
  //...(curTupleList.size()) will fit into an int.  We arleady made
  //...that assumption earlier in this method when we used totalRow
  //...in the call to calloc() to allocate rowIdArray.
  Column newCol;
  bool newFile;

#ifdef PROFILE
  timer.start("allocRowId");
#endif

  newColStructList = colStructList;
  newDctnryStructList = dctnryStructList;
  std::vector<boost::shared_ptr<DBRootExtentTracker> > dbRootExtentTrackers;
  bool bUseStartExtent = true;
  rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
                         rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
                         dbRootExtentTrackers, false, false, 0, false, NULL);

  if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
  {
    for (i = 0; i < newColStructList.size(); i++)
    {
      info.oid = newColStructList[i].dataOid;
      info.partitionNum = newColStructList[i].fColPartition;
      info.segmentNum = newColStructList[i].fColSegment;
      info.dbRoot = newColStructList[i].fColDbRoot;

      if (newFile)
        fileInfo.push_back(info);

      colExtentInfo.push_back(info);
    }

    int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);

    if ((rc1 == 0) && newFile)
    {
      rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum,
                              fileInfo[0].segmentNum);

      if (rc1 != NO_ERROR)
        return rc;

      FileOp fileOp;

      for (i = 0; i < newDctnryStructList.size(); i++)
      {
        if (newDctnryStructList[i].dctnryOid > 0)
        {
          info.oid = newDctnryStructList[i].dctnryOid;
          info.partitionNum = newDctnryStructList[i].fColPartition;
          info.segmentNum = newDctnryStructList[i].fColSegment;
          info.dbRoot = newDctnryStructList[i].fColDbRoot;
          info.newFile = true;
          fileInfo.push_back(info);
          dictExtentInfo.push_back(info);
        }
      }

      if (dictExtentInfo.size() > 0)
      {
        rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);

        if (rc1 != NO_ERROR)
          return rc;

        for (unsigned j = 0; j < fileInfo.size(); j++)
        {
          rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum,
                                  fileInfo[j].segmentNum);
        }
      }
    }
  }

  TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);

  //..Expand initial abbreviated extent if any RID in 1st extent is > 256K
  // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
  if ((partitionNum == 0) && (segmentNum == 0) && ((totalRow - rowsLeft) > 0) &&
      (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
  {
    for (size_t k = 1; k < colStructList.size(); k++)
    {
      Column expandCol;
      colOp = m_colOp[op(colStructList[k].fCompressionType)];
      colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType,
                         colStructList[k].colType, colStructList[k].dataOid,
                         colStructList[k].fCompressionType, dbRoot, partitionNum, segmentNum);
      colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType);
      rc = colOp->openColumnFile(expandCol, segFile, false);  // @bug 5572 HDFS tmp file

      if (rc == NO_ERROR)
      {
        if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
        {
          rc = colOp->expandAbbrevExtent(expandCol);
        }
      }

      if (rc != NO_ERROR)
      {
        if (newExtent)
        {
          // Remove the empty extent added to the first column
          int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);

          if ((rc1 == 0) && newFile)
          {
            rc1 = colOp->deleteFile(fileInfo[0].oid, fileInfo[0].dbRoot, fileInfo[0].partitionNum,
                                    fileInfo[0].segmentNum);
          }
        }

        colOp->clearColumn(expandCol);  // closes the file
        return rc;
      }

      colOp->clearColumn(expandCol);  // closes the file
    }
  }

  BRMWrapper::setUseVb(true);
  // Tokenize data if needed
  dictStr::iterator dctStr_iter;
  ColTupleList::iterator col_iter;

  for (i = 0; i < colStructList.size(); i++)
  {
    if (colStructList[i].tokenFlag)
    {
      dctStr_iter = dictStrList[i].begin();
      col_iter = colValueList[i].begin();
      Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];

      dctnryStructList[i].fColPartition = partitionNum;
      dctnryStructList[i].fColSegment = segmentNum;
      dctnryStructList[i].fColDbRoot = dbRoot;
      rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot,
                              dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment,
                              false);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
        return rc;

      ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == dctnryStructList[i].fColDbRoot) &&
            (it->partNum == dctnryStructList[i].fColPartition) &&
            (it->segNum == dctnryStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = dctnryStructList[i].fColDbRoot;
        aExt.partNum = dctnryStructList[i].fColPartition;
        aExt.segNum = dctnryStructList[i].fColSegment;
        aExt.compType = dctnryStructList[i].fCompressionType;
        aExt.isDict = true;
        aColExtsInfo.push_back(aExt);
        aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
      }

      for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
      {
        if (dctStr_iter->isNull())
        {
          Token nullToken;
          col_iter->data = nullToken;
        }
        else
        {
#ifdef PROFILE
          timer.start("tokenize");
#endif
          DctnryTuple dctTuple;
          dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
          dctTuple.sigSize = dctStr_iter->length();
          dctTuple.isNull = false;
          rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);

          if (rc != NO_ERROR)
          {
            dctnry->closeDctnry();
            return rc;
          }

#ifdef PROFILE
          timer.stop("tokenize");
#endif
          col_iter->data = dctTuple.token;
        }

        dctStr_iter++;
        col_iter++;
      }

      // close dictionary files
      rc = dctnry->closeDctnry();

      if (rc != NO_ERROR)
        return rc;

      if (newExtent)
      {
        rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
                                newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment,
                                false);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          return rc;

        aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
        it = aColExtsInfo.begin();

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) &&
              (it->partNum == newDctnryStructList[i].fColPartition) &&
              (it->segNum == newDctnryStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
          aExt.partNum = newDctnryStructList[i].fColPartition;
          aExt.segNum = newDctnryStructList[i].fColSegment;
          aExt.compType = newDctnryStructList[i].fCompressionType;
          aExt.isDict = true;
          aColExtsInfo.push_back(aExt);
          aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
        }

        for (uint32_t rows = 0; rows < rowsLeft; rows++)
        {
          if (dctStr_iter->isNull())
          {
            Token nullToken;
            col_iter->data = nullToken;
          }
          else
          {
#ifdef PROFILE
            timer.start("tokenize");
#endif
            DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
            dctTuple.sigSize = dctStr_iter->length();
            dctTuple.isNull = false;
            rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);

            if (rc != NO_ERROR)
            {
              dctnry->closeDctnry();
              return rc;
            }

#ifdef PROFILE
            timer.stop("tokenize");
#endif
            col_iter->data = dctTuple.token;
          }

          dctStr_iter++;
          col_iter++;
        }

        // close dictionary files
        rc = dctnry->closeDctnry();

        if (rc != NO_ERROR)
          return rc;
      }
    }
  }

  // Update column info structure @Bug 1862 set hwm
  //@Bug 2205 Check whether all rows go to the new extent
  RID lastRid = 0;
  RID lastRidNew = 0;

  if (totalRow - rowsLeft > 0)
  {
    lastRid = rowIdArray[totalRow - rowsLeft - 1];
    lastRidNew = rowIdArray[totalRow - 1];
  }
  else
  {
    lastRid = 0;
    lastRidNew = rowIdArray[totalRow - 1];
  }

  // cout << "rowid allocated is "  << lastRid << endl;
  // if a new extent is created, all the columns in this table should have their own new extent

  //@Bug 1701. Close the file
  m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
  std::vector<BulkSetHWMArg> hwmVecNewext;
  std::vector<BulkSetHWMArg> hwmVecOldext;

  if (newExtent)  // Save all hwms to set them later.
  {
    BulkSetHWMArg aHwmEntryNew;
    BulkSetHWMArg aHwmEntryOld;
    bool succFlag = false;
    unsigned colWidth = 0;
    int extState;
    bool extFound;
    int curFbo = 0, curBio;

    for (i = 0; i < totalColumns; i++)
    {
      Column curColLocal;
      colOp->initColumn(curColLocal);

      colOp = m_colOp[op(newColStructList[i].fCompressionType)];
      colOp->setColParam(curColLocal, 0, newColStructList[i].colWidth, newColStructList[i].colDataType,
                         newColStructList[i].colType, newColStructList[i].dataOid,
                         newColStructList[i].fCompressionType, dbRoot, partitionNum, segmentNum);
      colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType);

      rc = BRMWrapper::getInstance()->getLastHWM_DBroot(curColLocal.dataFile.fid, dbRoot, partitionNum,
                                                        segmentNum, oldHwm, extState, extFound);

      info.oid = curColLocal.dataFile.fid;
      info.partitionNum = partitionNum;
      info.segmentNum = segmentNum;
      info.dbRoot = dbRoot;
      info.hwm = oldHwm;
      colExtentInfo.push_back(info);
      // @Bug 2714 need to set hwm for the old extent
      colWidth = colStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      // cout << "insertcolumnrec   oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":"
      // << curFbo << ":" << hwm << endl;
      if (succFlag)
      {
        if ((HWM)curFbo > oldHwm)
        {
          aHwmEntryOld.oid = colStructList[i].dataOid;
          aHwmEntryOld.partNum = partitionNum;
          aHwmEntryOld.segNum = segmentNum;
          aHwmEntryOld.hwm = curFbo;
          hwmVecOldext.push_back(aHwmEntryOld);
        }
      }
      else
        return ERR_INVALID_PARAM;

      colWidth = newColStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      if (succFlag)
      {
        aHwmEntryNew.oid = newColStructList[i].dataOid;
        aHwmEntryNew.partNum = newColStructList[i].fColPartition;
        aHwmEntryNew.segNum = newColStructList[i].fColSegment;
        aHwmEntryNew.hwm = curFbo;
        hwmVecNewext.push_back(aHwmEntryNew);
      }

      m_colOp[op(curColLocal.compressionType)]->clearColumn(curColLocal);
    }

    // Prepare the valuelist for the new extent
    ColTupleList colTupleList;
    ColTupleList newColTupleList;
    ColTupleList firstPartTupleList;

    for (unsigned i = 0; i < totalColumns; i++)
    {
      colTupleList = static_cast<ColTupleList>(colValueList[i]);

      for (uint64_t j = rowsLeft; j > 0; j--)
      {
        newColTupleList.push_back(colTupleList[totalRow - j]);
      }

      colNewValueList.push_back(newColTupleList);
      newColTupleList.clear();

      // upate the oldvalue list for the old extent
      for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
      {
        firstPartTupleList.push_back(colTupleList[j]);
      }

      colOldValueList.push_back(firstPartTupleList);
      firstPartTupleList.clear();
    }
  }

  // Mark extents invalid
  bool successFlag = true;
  unsigned width = 0;
  int curFbo = 0, curBio, lastFbo = -1;

  if (totalRow - rowsLeft > 0)
  {
    for (unsigned i = 0; i < colStructList.size(); i++)
    {
      colOp = m_colOp[op(colStructList[i].fCompressionType)];
      width = colStructList[i].colWidth;
      successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

      if (successFlag)
      {
        if (curFbo != lastFbo)
        {
          RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo));
        }
      }
    }
  }

  lastRid = rowIdArray[totalRow - 1];

  for (unsigned i = 0; i < newColStructList.size(); i++)
  {
    colOp = m_colOp[op(newColStructList[i].fCompressionType)];
    width = newColStructList[i].colWidth;
    successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

    if (successFlag)
    {
      if (curFbo != lastFbo)
      {
        RETURN_ON_ERROR(AddLBIDtoList(txnid, newColStructList[i], curFbo));
      }
    }
  }

  // cout << "lbids size = " << lbids.size()<< endl;
  markTxnExtentsAsInvalid(txnid);

  if (rc == NO_ERROR)
  {
    // MCOL-66 The DBRM can't handle concurrent transactions to sys tables
    static boost::mutex dbrmMutex;
    boost::mutex::scoped_lock lk(dbrmMutex);

    if (newExtent)
    {
      rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList,
                          colNewValueList, tableOid, false);  // @bug 5572 HDFS tmp file
    }
    else
    {
      rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList,
                          colNewValueList, tableOid, false);  // @bug 5572 HDFS tmp file
    }
  }

#ifdef PROFILE
  timer.stop("writeColumnRec");
#endif
  //   for (ColTupleList::size_type  i = 0; i < totalRow; i++)
  //      ridList.push_back((RID) rowIdArray[i]);

  // if (rc == NO_ERROR)
  //   rc = flushDataFiles(NO_ERROR);

  if (!newExtent)
  {
    // flushVMCache();
    bool succFlag = false;
    unsigned colWidth = 0;
    int extState;
    bool extFound;
    int curFbo = 0, curBio;
    std::vector<BulkSetHWMArg> hwmVec;

    for (unsigned i = 0; i < totalColumns; i++)
    {
      // colOp = m_colOp[op(colStructList[i].fCompressionType)];
      // Set all columns hwm together
      BulkSetHWMArg aHwmEntry;
      RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
          colStructList[i].dataOid, dbRoot, partitionNum, segmentNum, hwm, extState, extFound));
      colWidth = colStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      // cout << "insertcolumnrec   oid:rid:fbo:hwm = " << colStructList[i].dataOid << ":" << lastRid << ":"
      // << curFbo << ":" << hwm << endl;
      if (succFlag)
      {
        if ((HWM)curFbo > hwm)
        {
          aHwmEntry.oid = colStructList[i].dataOid;
          aHwmEntry.partNum = partitionNum;
          aHwmEntry.segNum = segmentNum;
          aHwmEntry.hwm = curFbo;
          hwmVec.push_back(aHwmEntry);
        }
      }
      else
        return ERR_INVALID_PARAM;
    }

    if (hwmVec.size() > 0)
    {
      std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
      RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVec, mergeCPDataArgs));
    }
  }

  if (newExtent)
  {
#ifdef PROFILE
    timer.start("flushVMCache");
#endif
    std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
    RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecNewext, mergeCPDataArgs));
    RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecOldext, mergeCPDataArgs));
    // flushVMCache();
#ifdef PROFILE
    timer.stop("flushVMCache");
#endif
  }

#ifdef PROFILE
  timer.finish();
#endif
  return rc;
}

int WriteEngineWrapper::insertColumnRec_Single(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                               ColStructList& colStructList, ColValueList& colValueList,
                                               DctnryStructList& dctnryStructList, DictStrList& dictStrList,
                                               const int32_t tableOid)
{
  int rc;
  RID* rowIdArray = NULL;
  ColTupleList curTupleList;
  Column curCol;
  ColStruct curColStruct;
  ColValueList colOldValueList;
  ColValueList colNewValueList;
  ColStructList newColStructList;
  DctnryStructList newDctnryStructList;
  HWM hwm = 0;
  HWM newHwm = 0;
  HWM oldHwm = 0;
  ColTupleList::size_type totalRow;
  ColStructList::size_type totalColumns;
  uint64_t rowsLeft = 0;
  bool newExtent = false;
  RIDList ridList;
  ColumnOp* colOp = NULL;
  uint32_t i = 0;

#ifdef PROFILE
  StopWatch timer;
#endif

  m_opType = INSERT;

  // debug information for testing
  if (isDebug(DEBUG_2))
  {
    printInputValue(colStructList, colValueList, ridList, dctnryStructList, dictStrList);
  }

  // Convert data type and column width to write engine specific
  for (i = 0; i < colStructList.size(); i++)
    Convertor::convertColType(&colStructList[i]);

  uint32_t colId = 0;
  // MCOL-1675: find the smallest column width to calculate the RowID from so
  // that all HWMs will be incremented by this operation
  findSmallestColumn(colId, colStructList);

  rc = checkValid(txnid, colStructList, colValueList, ridList);

  if (rc != NO_ERROR)
    return rc;

  setTransId(txnid);

  curTupleList = static_cast<ColTupleList>(colValueList[0]);
  totalRow = curTupleList.size();
  totalColumns = colStructList.size();
  rowIdArray = new RID[totalRow];
  // use scoped_array to ensure ptr deletion regardless of where we return
  boost::scoped_array<RID> rowIdArrayPtr(rowIdArray);
  memset(rowIdArray, 0, (sizeof(RID) * totalRow));

  //--------------------------------------------------------------------------
  // allocate row id(s)
  //--------------------------------------------------------------------------
  curColStruct = colStructList[colId];
  colOp = m_colOp[op(curColStruct.fCompressionType)];

  colOp->initColumn(curCol);

  // Get the correct segment, partition, column file
  uint16_t dbRoot;
  uint16_t segmentNum = 0;
  uint32_t partitionNum = 0;
  // Don't search for empty space, always append to the end. May revisit later
  dbRoot = curColStruct.fColDbRoot;
  int extState;
  bool bStartExtFound;
  bool bUseStartExtent = false;
  RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(curColStruct.dataOid, dbRoot, partitionNum,
                                                               segmentNum, hwm, extState, bStartExtFound));

  if ((bStartExtFound) && (extState == BRM::EXTENTAVAILABLE))
    bUseStartExtent = true;

  for (i = 0; i < colStructList.size(); i++)
  {
    colStructList[i].fColPartition = partitionNum;
    colStructList[i].fColSegment = segmentNum;
    colStructList[i].fColDbRoot = dbRoot;
  }

  for (i = 0; i < dctnryStructList.size(); i++)
  {
    dctnryStructList[i].fColPartition = partitionNum;
    dctnryStructList[i].fColSegment = segmentNum;
    dctnryStructList[i].fColDbRoot = dbRoot;
  }

  oldHwm = hwm;  // Save this info for rollback
  // need to pass real dbRoot, partition, and segment to setColParam
  colOp->setColParam(curCol, colId, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                     curColStruct.dataOid, curColStruct.fCompressionType, dbRoot, partitionNum, segmentNum);
  colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);
  string segFile;

  if (bUseStartExtent)
  {
    rc = colOp->openColumnFile(curCol, segFile, true);  // @bug 5572 HDFS tmp file

    if (rc != NO_ERROR)
    {
      return rc;
    }
  }

  bool newFile;

#ifdef PROFILE
  timer.start("allocRowId");
#endif
  newColStructList = colStructList;
  newDctnryStructList = dctnryStructList;
  std::vector<boost::shared_ptr<DBRootExtentTracker> > dbRootExtentTrackers;
  rc = colOp->allocRowId(txnid, bUseStartExtent, curCol, (uint64_t)totalRow, rowIdArray, hwm, newExtent,
                         rowsLeft, newHwm, newFile, newColStructList, newDctnryStructList,
                         dbRootExtentTrackers, false, false, 0, false, NULL);

  //--------------------------------------------------------------------------
  // Handle case where we ran out of disk space allocating a new extent.
  // Rollback extentmap and delete any db files that were created.
  //--------------------------------------------------------------------------
  if (rc != NO_ERROR)
  {
    if ((rc == ERR_FILE_DISK_SPACE) && newExtent)
    {
      vector<ExtentInfo> colExtentInfo;
      vector<ExtentInfo> dictExtentInfo;
      vector<ExtentInfo> fileInfo;
      ExtentInfo info;

      for (i = 0; i < newColStructList.size(); i++)
      {
        info.oid = newColStructList[i].dataOid;
        info.partitionNum = newColStructList[i].fColPartition;
        info.segmentNum = newColStructList[i].fColSegment;
        info.dbRoot = newColStructList[i].fColDbRoot;

        if (newFile)
          fileInfo.push_back(info);

        colExtentInfo.push_back(info);
      }

      int rc1 = BRMWrapper::getInstance()->deleteEmptyColExtents(colExtentInfo);

      // Only rollback dictionary extents "if" store file is new
      if ((rc1 == 0) && newFile)
      {
        for (unsigned int j = 0; j < fileInfo.size(); j++)
        {
          // ignore return code and delete what we can
          rc1 = colOp->deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum,
                                  fileInfo[j].segmentNum);
        }

        fileInfo.clear();

        for (i = 0; i < newDctnryStructList.size(); i++)
        {
          if (newDctnryStructList[i].dctnryOid > 0)
          {
            info.oid = newDctnryStructList[i].dctnryOid;
            info.partitionNum = newDctnryStructList[i].fColPartition;
            info.segmentNum = newDctnryStructList[i].fColSegment;
            info.dbRoot = newDctnryStructList[i].fColDbRoot;
            info.newFile = true;
            fileInfo.push_back(info);
            dictExtentInfo.push_back(info);
          }
        }

        if (dictExtentInfo.size() > 0)
        {
          FileOp fileOp;
          rc1 = BRMWrapper::getInstance()->deleteEmptyDictStoreExtents(dictExtentInfo);

          if (rc1 != NO_ERROR)
            return rc;

          for (unsigned j = 0; j < fileInfo.size(); j++)
          {
            rc1 = fileOp.deleteFile(fileInfo[j].oid, fileInfo[j].dbRoot, fileInfo[j].partitionNum,
                                    fileInfo[j].segmentNum);
          }
        }
      }
    }  // disk space error allocating new extent

    return rc;
  }  // rc != NO_ERROR from call to allocRowID()

#ifdef PROFILE
  timer.stop("allocRowId");
#endif

  TableMetaData* aTableMetaData = TableMetaData::makeTableMetaData(tableOid);

  //--------------------------------------------------------------------------
  // Expand initial abbreviated extent if any RID in 1st extent is > 256K.
  // if totalRow == rowsLeft, then not adding rows to 1st extent, so skip it.
  //--------------------------------------------------------------------------
  // DMC-SHARED_NOTHING_NOTE: Is it safe to assume only part0 seg0 is abbreviated?
  if ((colStructList[colId].fColPartition == 0) && (colStructList[colId].fColSegment == 0) &&
      ((totalRow - rowsLeft) > 0) &&
      (rowIdArray[totalRow - rowsLeft - 1] >= (RID)INITIAL_EXTENT_ROWS_TO_DISK))
  {
    for (unsigned k = 0; k < colStructList.size(); k++)
    {
      if (k == colId)
        continue;
      Column expandCol;
      colOp = m_colOp[op(colStructList[k].fCompressionType)];
      colOp->setColParam(expandCol, 0, colStructList[k].colWidth, colStructList[k].colDataType,
                         colStructList[k].colType, colStructList[k].dataOid,
                         colStructList[k].fCompressionType, colStructList[k].fColDbRoot,
                         colStructList[k].fColPartition, colStructList[k].fColSegment);
      colOp->findTypeHandler(colStructList[k].colWidth, colStructList[k].colDataType);
      rc = colOp->openColumnFile(expandCol, segFile, true);  // @bug 5572 HDFS tmp file

      if (rc == NO_ERROR)
      {
        if (colOp->abbreviatedExtent(expandCol.dataFile.pFile, colStructList[k].colWidth))
        {
          rc = colOp->expandAbbrevExtent(expandCol);
        }
      }

      colOp->clearColumn(expandCol);  // closes the file

      if (rc != NO_ERROR)
      {
        return rc;
      }
    }  // loop through columns
  }    // if starting extent needs to be expanded

  //--------------------------------------------------------------------------
  // Tokenize data if needed
  //--------------------------------------------------------------------------
  dictStr::iterator dctStr_iter;
  ColTupleList::iterator col_iter;

  for (unsigned i = 0; i < colStructList.size(); i++)
  {
    if (colStructList[i].tokenFlag)
    {
      dctStr_iter = dictStrList[i].begin();
      col_iter = colValueList[i].begin();
      Dctnry* dctnry = m_dctnry[op(dctnryStructList[i].fCompressionType)];

      ColExtsInfo aColExtsInfo = aTableMetaData->getColExtsInfo(dctnryStructList[i].dctnryOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      if (bUseStartExtent)
      {
        rc = dctnry->openDctnry(dctnryStructList[i].dctnryOid, dctnryStructList[i].fColDbRoot,
                                dctnryStructList[i].fColPartition, dctnryStructList[i].fColSegment,
                                true);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          return rc;

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == dctnryStructList[i].fColDbRoot) &&
              (it->partNum == dctnryStructList[i].fColPartition) &&
              (it->segNum == dctnryStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = dctnryStructList[i].fColDbRoot;
          aExt.partNum = dctnryStructList[i].fColPartition;
          aExt.segNum = dctnryStructList[i].fColSegment;
          aExt.compType = dctnryStructList[i].fCompressionType;
          aExt.isDict = true;
          aColExtsInfo.push_back(aExt);
          aTableMetaData->setColExtsInfo(dctnryStructList[i].dctnryOid, aColExtsInfo);
        }

        for (uint32_t rows = 0; rows < (totalRow - rowsLeft); rows++)
        {
          if (dctStr_iter->isNull())
          {
            Token nullToken;
            col_iter->data = nullToken;
          }
          else
          {
#ifdef PROFILE
            timer.start("tokenize");
#endif
            DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
            dctTuple.sigSize = dctStr_iter->length();
            dctTuple.isNull = false;
            rc = tokenize(txnid, dctTuple, dctnryStructList[i].fCompressionType);

            if (rc != NO_ERROR)
            {
              dctnry->closeDctnry();
              return rc;
            }

#ifdef PROFILE
            timer.stop("tokenize");
#endif
            col_iter->data = dctTuple.token;
          }

          dctStr_iter++;
          col_iter++;
        }

        // close dictionary files
        rc = dctnry->closeDctnry();

        if (rc != NO_ERROR)
          return rc;
      }  // tokenize dictionary rows in 1st extent

      if (newExtent)
      {
        rc = dctnry->openDctnry(newDctnryStructList[i].dctnryOid, newDctnryStructList[i].fColDbRoot,
                                newDctnryStructList[i].fColPartition, newDctnryStructList[i].fColSegment,
                                false);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          return rc;

        aColExtsInfo = aTableMetaData->getColExtsInfo(newDctnryStructList[i].dctnryOid);
        it = aColExtsInfo.begin();

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == newDctnryStructList[i].fColDbRoot) &&
              (it->partNum == newDctnryStructList[i].fColPartition) &&
              (it->segNum == newDctnryStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = newDctnryStructList[i].fColDbRoot;
          aExt.partNum = newDctnryStructList[i].fColPartition;
          aExt.segNum = newDctnryStructList[i].fColSegment;
          aExt.compType = newDctnryStructList[i].fCompressionType;
          aExt.isDict = true;
          aColExtsInfo.push_back(aExt);
          aTableMetaData->setColExtsInfo(newDctnryStructList[i].dctnryOid, aColExtsInfo);
        }

        for (uint32_t rows = 0; rows < rowsLeft; rows++)
        {
          if (dctStr_iter->isNull())
          {
            Token nullToken;
            col_iter->data = nullToken;
          }
          else
          {
#ifdef PROFILE
            timer.start("tokenize");
#endif
            DctnryTuple dctTuple;
            dctTuple.sigValue = (unsigned char*)dctStr_iter->str();
            dctTuple.sigSize = dctStr_iter->length();
            dctTuple.isNull = false;
            rc = tokenize(txnid, dctTuple, newDctnryStructList[i].fCompressionType);

            if (rc != NO_ERROR)
            {
              dctnry->closeDctnry();
              return rc;
            }

#ifdef PROFILE
            timer.stop("tokenize");
#endif
            col_iter->data = dctTuple.token;
          }

          dctStr_iter++;
          col_iter++;
        }

        // close dictionary files
        rc = dctnry->closeDctnry();

        if (rc != NO_ERROR)
          return rc;
      }  // tokenize dictionary rows in second extent
    }    // tokenize dictionary columns
  }      // loop through columns to see which ones need tokenizing

  //----------------------------------------------------------------------
  // Update column info structure @Bug 1862 set hwm, and
  // Prepare ValueList for new extent (if applicable)
  //----------------------------------------------------------------------
  //@Bug 2205 Check whether all rows go to the new extent
  RID lastRid = 0;
  RID lastRidNew = 0;

  if (totalRow - rowsLeft > 0)
  {
    lastRid = rowIdArray[totalRow - rowsLeft - 1];
    lastRidNew = rowIdArray[totalRow - 1];
  }
  else
  {
    lastRid = 0;
    lastRidNew = rowIdArray[totalRow - 1];
  }

  // if a new extent is created, all the columns in this table should
  // have their own new extent

  //@Bug 1701. Close the file
  if (bUseStartExtent)
  {
    m_colOp[op(curCol.compressionType)]->clearColumn(curCol);
  }

  std::vector<BulkSetHWMArg> hwmVecNewext;
  std::vector<BulkSetHWMArg> hwmVecOldext;

  if (newExtent)  // Save all hwms to set them later.
  {
    BulkSetHWMArg aHwmEntryNew;
    BulkSetHWMArg aHwmEntryOld;

    bool succFlag = false;
    unsigned colWidth = 0;
    int curFbo = 0, curBio;

    for (i = 0; i < totalColumns; i++)
    {
      colOp = m_colOp[op(newColStructList[i].fCompressionType)];

      // @Bug 2714 need to set hwm for the old extent
      colWidth = colStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      if (succFlag)
      {
        if ((HWM)curFbo > oldHwm)
        {
          aHwmEntryOld.oid = colStructList[i].dataOid;
          aHwmEntryOld.partNum = colStructList[i].fColPartition;
          aHwmEntryOld.segNum = colStructList[i].fColSegment;
          aHwmEntryOld.hwm = curFbo;
          hwmVecOldext.push_back(aHwmEntryOld);
        }
      }
      else
        return ERR_INVALID_PARAM;

      colWidth = newColStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRidNew, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      if (succFlag)
      {
        aHwmEntryNew.oid = newColStructList[i].dataOid;
        aHwmEntryNew.partNum = newColStructList[i].fColPartition;
        aHwmEntryNew.segNum = newColStructList[i].fColSegment;
        aHwmEntryNew.hwm = curFbo;
        hwmVecNewext.push_back(aHwmEntryNew);
      }
    }

    //----------------------------------------------------------------------
    // Prepare the valuelist for the new extent
    //----------------------------------------------------------------------
    ColTupleList colTupleList;
    ColTupleList newColTupleList;
    ColTupleList firstPartTupleList;

    for (unsigned i = 0; i < totalColumns; i++)
    {
      colTupleList = static_cast<ColTupleList>(colValueList[i]);

      for (uint64_t j = rowsLeft; j > 0; j--)
      {
        newColTupleList.push_back(colTupleList[totalRow - j]);
      }

      colNewValueList.push_back(newColTupleList);

      newColTupleList.clear();

      // upate the oldvalue list for the old extent
      for (uint64_t j = 0; j < (totalRow - rowsLeft); j++)
      {
        firstPartTupleList.push_back(colTupleList[j]);
      }

      colOldValueList.push_back(firstPartTupleList);
      firstPartTupleList.clear();
    }
  }

  //--------------------------------------------------------------------------
  // Mark extents invalid
  //--------------------------------------------------------------------------
  // WIP
  // Set min/max in dmlprocprocessor if aplicable
  vector<BRM::LBID_t> lbids;
  vector<CalpontSystemCatalog::ColDataType> colDataTypes;
  bool successFlag = true;
  unsigned width = 0;
  int curFbo = 0, curBio, lastFbo = -1;
  lastRid = rowIdArray[totalRow - 1];

  for (unsigned i = 0; i < colStructList.size(); i++)
  {
    colOp = m_colOp[op(colStructList[i].fCompressionType)];
    width = colStructList[i].colWidth;
    successFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

    if (successFlag)
    {
      if (curFbo != lastFbo)
      {
        colDataTypes.push_back(colStructList[i].colDataType);
        RETURN_ON_ERROR(AddLBIDtoList(txnid, colStructList[i], curFbo));
      }
    }
  }

  markTxnExtentsAsInvalid(txnid);

  //--------------------------------------------------------------------------
  // Write row(s) to database file(s)
  //--------------------------------------------------------------------------
#ifdef PROFILE
  timer.start("writeColumnRec");
#endif

  if (rc == NO_ERROR)
  {
    if (newExtent)
    {
      rc = writeColumnRec(txnid, cscColTypeList, colStructList, colOldValueList, rowIdArray, newColStructList,
                          colNewValueList, tableOid,
                          false);  // @bug 5572 HDFS tmp file
    }
    else
    {
      rc = writeColumnRec(txnid, cscColTypeList, colStructList, colValueList, rowIdArray, newColStructList,
                          colNewValueList, tableOid,
                          true);  // @bug 5572 HDFS tmp file
    }
  }

#ifdef PROFILE
  timer.stop("writeColumnRec");
#endif

  //--------------------------------------------------------------------------
  // Update BRM
  //--------------------------------------------------------------------------
  if (!newExtent)
  {
    bool succFlag = false;
    unsigned colWidth = 0;
    int extState;
    bool extFound;
    int curFbo = 0, curBio;
    std::vector<BulkSetHWMArg> hwmVec;

    for (unsigned i = 0; i < totalColumns; i++)
    {
      // Set all columns hwm together
      BulkSetHWMArg aHwmEntry;
      RETURN_ON_ERROR(BRMWrapper::getInstance()->getLastHWM_DBroot(
          colStructList[i].dataOid, colStructList[i].fColDbRoot, colStructList[i].fColPartition,
          colStructList[i].fColSegment, hwm, extState, extFound));
      colWidth = colStructList[i].colWidth;
      succFlag = colOp->calculateRowId(lastRid, BYTE_PER_BLOCK / colWidth, colWidth, curFbo, curBio);

      // cout << "insertcolumnrec   oid:rid:fbo:hwm = " <<
      // colStructList[i].dataOid << ":" << lastRid << ":" <<
      // curFbo << ":" << hwm << endl;
      if (succFlag)
      {
        if ((HWM)curFbo > hwm)
        {
          aHwmEntry.oid = colStructList[i].dataOid;
          aHwmEntry.partNum = colStructList[i].fColPartition;
          aHwmEntry.segNum = colStructList[i].fColSegment;
          aHwmEntry.hwm = curFbo;
          hwmVec.push_back(aHwmEntry);
        }
      }
      else
        return ERR_INVALID_PARAM;
    }

    std::vector<BRM::CPInfoMerge> mergeCPDataArgs;
    RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVec, mergeCPDataArgs));
  }
  else  // if (newExtent)
  {
#ifdef PROFILE
    timer.start("flushVMCache");
#endif
    std::vector<BRM::CPInfoMerge> mergeCPDataArgs;

    if (hwmVecNewext.size() > 0)
      RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecNewext, mergeCPDataArgs));

    if (hwmVecOldext.size() > 0)
      RETURN_ON_ERROR(BRMWrapper::getInstance()->bulkSetHWMAndCP(hwmVecOldext, mergeCPDataArgs));

#ifdef PROFILE
    timer.stop("flushVMCache");
#endif
  }

#ifdef PROFILE
  timer.finish();
#endif
  return rc;
}

/*@brief printInputValue - Print input value
 */
/***********************************************************
 * DESCRIPTION:
 *    Print input value
 * PARAMETERS:
 *    tableOid - table object id
 *    colStructList - column struct list
 *    colValueList - column value list
 *    ridList - RID list
 * RETURN:
 *    none
 ***********************************************************/
void WriteEngineWrapper::printInputValue(const ColStructList& colStructList, const ColValueList& colValueList,
                                         const RIDList& ridList, const DctnryStructList& dctnryStructList,
                                         const DictStrList& dictStrList) const
{
  ColTupleList curTupleList;
  ColStruct curColStruct;
  ColTuple curTuple;
  string curStr;
  ColStructList::size_type i;
  size_t j;
  OidToIdxMap oidToIdxMap;

  std::cerr << std::endl << "=========================" << std::endl;

  std::cerr << "Total RIDs: " << ridList.size() << std::endl;

  for (i = 0; i < ridList.size(); i++)
    std::cerr << "RID[" << i << "] : " << ridList[i] << std::endl;

  std::cerr << "Total Columns: " << colStructList.size() << std::endl;

  for (i = 0; i < colStructList.size(); i++)
  {
    curColStruct = colStructList[i];
    curTupleList = colValueList[i];
    if (curColStruct.tokenFlag)
    {
      oidToIdxMap.insert({curColStruct.dataOid, i});
      continue;
    }

    std::cerr << "Column[" << i << "]";
    std::cerr << "Data file OID : " << curColStruct.dataOid << "\t";
    std::cerr << "Width : " << curColStruct.colWidth << "\t"
              << " Type: " << curColStruct.colDataType << std::endl;
    std::cerr << "Total values : " << curTupleList.size() << std::endl;

    for (j = 0; j < curTupleList.size(); j++)
    {
      curTuple = curTupleList[j];

      try
      {
        if (curTuple.data.type() == typeid(int))
          curStr = boost::lexical_cast<string>(boost::any_cast<int>(curTuple.data));
        else if (curTuple.data.type() == typeid(unsigned int))
          curStr = boost::lexical_cast<string>(boost::any_cast<unsigned int>(curTuple.data));
        else if (curTuple.data.type() == typeid(float))
          curStr = boost::lexical_cast<string>(boost::any_cast<float>(curTuple.data));
        else if (curTuple.data.type() == typeid(long long))
          curStr = boost::lexical_cast<string>(boost::any_cast<long long>(curTuple.data));
        else if (curTuple.data.type() == typeid(unsigned long long))
          curStr = boost::lexical_cast<string>(boost::any_cast<unsigned long long>(curTuple.data));
        else if (curTuple.data.type() == typeid(int128_t))
          curStr = datatypes::TSInt128(boost::any_cast<int128_t>(curTuple.data)).toString();
        else if (curTuple.data.type() == typeid(double))
          curStr = boost::lexical_cast<string>(boost::any_cast<double>(curTuple.data));
        else if (curTuple.data.type() == typeid(short))
          curStr = boost::lexical_cast<string>(boost::any_cast<short>(curTuple.data));
        else if (curTuple.data.type() == typeid(unsigned short))
          curStr = boost::lexical_cast<string>(boost::any_cast<unsigned short>(curTuple.data));
        else if (curTuple.data.type() == typeid(char))
          curStr = boost::lexical_cast<string>(boost::any_cast<char>(curTuple.data));
        else
          curStr = boost::any_cast<string>(curTuple.data);
      }
      catch (...)
      {
      }

      if (isDebug(DEBUG_3))
        std::cerr << "Value[" << j << "]: " << curStr.c_str() << std::endl;
    }
  }
  for (i = 0; i < dctnryStructList.size(); ++i)
  {
    if (dctnryStructList[i].dctnryOid == 0)
      continue;
    std::cerr << "Dict[" << i << "]";
    std::cerr << " file OID : " << dctnryStructList[i].dctnryOid
              << " Token file OID: " << dctnryStructList[i].columnOid << "\t";
    std::cerr << "Width : " << dctnryStructList[i].colWidth << "\t"
              << " Type: " << dctnryStructList[i].fCompressionType << std::endl;
    std::cerr << "Total values : " << dictStrList.size() << std::endl;
    if (isDebug(DEBUG_3))
    {
      for (j = 0; j < dictStrList[i].size(); ++j)
      {
        // We presume there will be a value.
        auto tokenOidIdx = oidToIdxMap[dctnryStructList[i].columnOid];
        std::cerr << "string [" << dictStrList[i][j].safeString("<<null>>") << "]" << std::endl;
        bool isToken = colStructList[tokenOidIdx].colType == WriteEngine::WR_TOKEN &&
                       colStructList[tokenOidIdx].tokenFlag;
        if (isToken && !colValueList[tokenOidIdx][j].data.empty())
        {
          Token t = boost::any_cast<Token>(colValueList[tokenOidIdx][j].data);
          std::cerr << "Token: block pos:[" << t.op << "] fbo: [" << t.fbo << "] bc: [" << t.bc << "]"
                    << std::endl;
        }
      }
    }
  }

  std::cerr << "=========================" << std::endl;
}

/***********************************************************
 * DESCRIPTION:
 *    Process version buffer before any write operation
 * PARAMETERS:
 *    txnid - transaction id
 *    oid - column oid
 *    totalRow - total number of rows
 *    rowIdArray - rowid array
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in inserting the value
 ***********************************************************/
int WriteEngineWrapper::processVersionBuffer(IDBDataFile* pFile, const TxnID& txnid,
                                             const ColStruct& colStruct, int width, int totalRow,
                                             const RID* rowIdArray, vector<LBIDRange>& rangeList)
{
  if (idbdatafile::IDBPolicy::useHdfs())
    return 0;

  RID curRowId;
  int rc = NO_ERROR;
  int curFbo = 0, curBio, lastFbo = -1;
  bool successFlag;
  BRM::LBID_t lbid;
  BRM::VER_t verId = (BRM::VER_t)txnid;
  vector<uint32_t> fboList;
  LBIDRange range;
  ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];

  for (int i = 0; i < totalRow; i++)
  {
    curRowId = rowIdArray[i];
    // cout << "processVersionBuffer got rid " << curRowId << endl;
    successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);

    if (successFlag)
    {
      if (curFbo != lastFbo)
      {
        // cout << "processVersionBuffer is processing lbid  " << lbid << endl;
        RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStruct.dataOid, colStruct.fColPartition,
                                                              colStruct.fColSegment, curFbo, lbid));
        // cout << "processVersionBuffer is processing lbid  " << lbid << endl;
        fboList.push_back((uint32_t)curFbo);
        range.start = lbid;
        range.size = 1;
        rangeList.push_back(range);
      }

      lastFbo = curFbo;
    }
  }

  std::vector<VBRange> freeList;
  rc = BRMWrapper::getInstance()->writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp,
                                          freeList, colStruct.fColDbRoot);

  return rc;
}

int WriteEngineWrapper::processVersionBuffers(IDBDataFile* pFile, const TxnID& txnid,
                                              const ColStruct& colStruct, int width, int totalRow,
                                              const RIDList& ridList, vector<LBIDRange>& rangeList)
{
  if (idbdatafile::IDBPolicy::useHdfs())
    return 0;

  RID curRowId;
  int rc = NO_ERROR;
  int curFbo = 0, curBio, lastFbo = -1;
  bool successFlag;
  BRM::LBID_t lbid;
  BRM::VER_t verId = (BRM::VER_t)txnid;
  LBIDRange range;
  vector<uint32_t> fboList;
  // vector<LBIDRange>   rangeList;
  ColumnOp* colOp = m_colOp[op(colStruct.fCompressionType)];

  for (int i = 0; i < totalRow; i++)
  {
    curRowId = ridList[i];
    // cout << "processVersionBuffer got rid " << curRowId << endl;
    successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / width, width, curFbo, curBio);

    if (successFlag)
    {
      if (curFbo != lastFbo)
      {
        // cout << "processVersionBuffer is processing lbid  " << lbid << endl;
        RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStruct.dataOid, colStruct.fColPartition,
                                                              colStruct.fColSegment, curFbo, lbid));
        // cout << "processVersionBuffer is processing lbid  " << lbid << endl;
        fboList.push_back((uint32_t)curFbo);
        range.start = lbid;
        range.size = 1;
        rangeList.push_back(range);
      }

      lastFbo = curFbo;
    }
  }

  // cout << "calling writeVB with blocks " << rangeList.size() << endl;
  std::vector<VBRange> freeList;
  rc = BRMWrapper::getInstance()->writeVB(pFile, verId, colStruct.dataOid, fboList, rangeList, colOp,
                                          freeList, colStruct.fColDbRoot);

  return rc;
}

int WriteEngineWrapper::processBeginVBCopy(const TxnID& txnid, const vector<ColStruct>& colStructList,
                                           const RIDList& ridList, std::vector<VBRange>& freeList,
                                           vector<vector<uint32_t> >& fboLists,
                                           vector<vector<LBIDRange> >& rangeLists,
                                           vector<LBIDRange>& rangeListTot)
{
  if (idbdatafile::IDBPolicy::useHdfs())
    return 0;

  RID curRowId;
  int rc = NO_ERROR;
  int curFbo = 0, curBio, lastFbo = -1;
  bool successFlag;
  BRM::LBID_t lbid;
  LBIDRange range;

  // StopWatch timer;
  // timer.start("calculation");
  for (uint32_t j = 0; j < colStructList.size(); j++)
  {
    vector<uint32_t> fboList;
    vector<LBIDRange> rangeList;
    lastFbo = -1;
    ColumnOp* colOp = m_colOp[op(colStructList[j].fCompressionType)];

    ColStruct curColStruct = colStructList[j];
    Convertor::convertColType(&curColStruct);

    for (uint32_t i = 0; i < ridList.size(); i++)
    {
      curRowId = ridList[i];
      // cout << "processVersionBuffer got rid " << curRowId << endl;
      successFlag = colOp->calculateRowId(curRowId, BYTE_PER_BLOCK / curColStruct.colWidth,
                                          curColStruct.colWidth, curFbo, curBio);

      if (successFlag)
      {
        if (curFbo != lastFbo)
        {
          // cout << "processVersionBuffer is processing curFbo  " << curFbo << endl;
          RETURN_ON_ERROR(BRMWrapper::getInstance()->getBrmInfo(colStructList[j].dataOid,
                                                                colStructList[j].fColPartition,
                                                                colStructList[j].fColSegment, curFbo, lbid));
          // cout << "beginVBCopy is processing lbid:transaction  " << lbid <<":"<<txnid<< endl;
          fboList.push_back((uint32_t)curFbo);
          range.start = lbid;
          range.size = 1;
          rangeList.push_back(range);
        }

        lastFbo = curFbo;
      }
    }

    BRMWrapper::getInstance()->pruneLBIDList(txnid, &rangeList, &fboList);
    rangeLists.push_back(rangeList);

    fboLists.push_back(fboList);
    rangeListTot.insert(rangeListTot.end(), rangeList.begin(), rangeList.end());
  }

  if (rangeListTot.size() > 0)
    rc = BRMWrapper::getInstance()->getDbrmObject()->beginVBCopy(txnid, colStructList[0].fColDbRoot,
                                                                 rangeListTot, freeList);

  // timer.stop("beginVBCopy");
  // timer.finish();
  return rc;
}

/**
 * @brief Process versioning for batch insert - only version the hwm block.
 */
#if 0
int WriteEngineWrapper::processBatchVersions(const TxnID& txnid, std::vector<Column> columns, std::vector<BRM::LBIDRange>&   rangeList)
{
    int rc = 0;
    std::vector<DbFileOp*> fileOps;

    //open the column files
    for ( unsigned i = 0; i < columns.size(); i++)
    {
        ColumnOp* colOp = m_colOp[op(columns[i].compressionType)];
        Column curCol;
        // set params
        colOp->initColumn(curCol);
        ColType colType;
        Convertor::convertColType(columns[i].colDataType, colType);
        colOp->setColParam(curCol, 0, columns[i].colWidth,
                           columns[i].colDataType, colType, columns[i].dataFile.oid,
                           columns[i].compressionType,
                           columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
        string segFile;
        rc = colOp->openColumnFile(curCol, segFile, IO_BUFF_SIZE);

        if (rc != NO_ERROR)
            break;

        columns[i].dataFile.pFile = curCol.dataFile.pFile;
        fileOps.push_back(colOp);
    }

    if ( rc == 0)
    {
        BRM::VER_t  verId = (BRM::VER_t) txnid;
        rc = BRMWrapper::getInstance()->writeBatchVBs(verId, columns, rangeList, fileOps);
    }

    //close files
    for ( unsigned i = 0; i < columns.size(); i++)
    {
        ColumnOp* colOp = dynamic_cast<ColumnOp*> (fileOps[i]);
        Column curCol;
        // set params
        colOp->initColumn(curCol);
        ColType colType;
        Convertor::convertColType(columns[i].colDataType, colType);
        colOp->setColParam(curCol, 0, columns[i].colWidth,
                           columns[i].colDataType, colType, columns[i].dataFile.oid,
                           columns[i].compressionType,
                           columns[i].dataFile.fDbRoot, columns[i].dataFile.fPartition, columns[i].dataFile.fSegment);
        curCol.dataFile.pFile = columns[i].dataFile.pFile;
        colOp->clearColumn(curCol);
    }

    return rc;
}
#endif
void WriteEngineWrapper::writeVBEnd(const TxnID& txnid, std::vector<BRM::LBIDRange>& rangeList)
{
  if (idbdatafile::IDBPolicy::useHdfs())
    return;

  BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
}

int WriteEngineWrapper::updateColumnRec(const TxnID& txnid, const vector<CSCTypesList>& colExtentsColType,
                                        vector<ColStructList>& colExtentsStruct, ColValueList& colValueList,
                                        vector<void*>& colOldValueList, vector<RIDList>& ridLists,
                                        vector<DctnryStructList>& dctnryExtentsStruct,
                                        DctnryValueList& dctnryValueList, const int32_t tableOid,
                                        bool hasAUXCol)
{
  int rc = 0;
  unsigned numExtents = colExtentsStruct.size();
  ColStructList colStructList;
  DctnryStructList dctnryStructList;
  WriteEngine::CSCTypesList cscColTypeList;
  ColumnOp* colOp = NULL;
  ExtCPInfoList infosToUpdate;

  for (unsigned extent = 0; extent < numExtents; extent++)
  {
    colStructList = colExtentsStruct[extent];
    dctnryStructList = dctnryExtentsStruct[extent];
    cscColTypeList = colExtentsColType[extent];

    if (m_opType != DELETE)
    {
      // Tokenize data if needed
      vector<Token> tokenList;

      DctColTupleList::iterator dctCol_iter;
      ColTupleList::iterator col_iter;

      for (unsigned i = 0; i < colStructList.size(); i++)
      {
        if (colStructList[i].tokenFlag)
        {
          // only need to tokenize once
          dctCol_iter = dctnryValueList[i].begin();
          Token token;

          if (!dctCol_iter->isNull)
          {
            RETURN_ON_ERROR_REPORT(
                tokenize(txnid, dctnryStructList[i], *dctCol_iter, true));  // @bug 5572 HDFS tmp file
            token = dctCol_iter->token;

#ifdef PROFILE
// timer.stop("tokenize");
#endif
          }

          tokenList.push_back(token);
        }
      }

      int dicPos = 0;

      for (unsigned i = 0; i < colStructList.size(); i++)
      {
        if (colStructList[i].tokenFlag)
        {
          // only need to tokenize once
          col_iter = colValueList[i].begin();

          while (col_iter != colValueList[i].end())
          {
            col_iter->data = tokenList[dicPos];
            col_iter++;
          }

          dicPos++;
        }
      }
    }

    RIDList::iterator rid_iter;
    // Mark extents invalid
    bool successFlag = true;
    unsigned width = 0;
    int curFbo = 0, curBio, lastFbo = -1;
    rid_iter = ridLists[extent].begin();
    RID aRid = *rid_iter;

    ExtCPInfoList currentExtentRanges;
    for (const auto& colStruct : colStructList)
    {
      currentExtentRanges.push_back(
          ExtCPInfo(colStruct.colDataType, colStruct.colWidth));  // temporary for each extent.
    }
    std::vector<ExtCPInfo*> currentExtentRangesPtrs(colStructList.size(), NULL);  // pointers for each extent.

    if (m_opType != DELETE)
      m_opType = UPDATE;

    for (unsigned j = 0; j < colStructList.size(); j++)
    {
      colOp = m_colOp[op(colStructList[j].fCompressionType)];
      ExtCPInfo* cpInfoP = &(currentExtentRanges[j]);
      cpInfoP = getCPInfoToUpdateForUpdatableType(colStructList[j], cpInfoP, m_opType);
      currentExtentRangesPtrs[j] = cpInfoP;

      // XXX: highly dubious.
      // if (!colStructList[j].tokenFlag)
      //    continue;

      width = colOp->getCorrectRowWidth(colStructList[j].colDataType, colStructList[j].colWidth);
      successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

      if (successFlag)
      {
        if (curFbo != lastFbo)
        {
          RETURN_ON_ERROR_REPORT(AddLBIDtoList(txnid, colStructList[j], curFbo, cpInfoP));
        }
      }
    }

    //#ifdef PROFILE
    // timer.start("markExtentsInvalid");
    //#endif

    bool hasFastDelete = false;

    if (m_opType == DELETE && hasAUXCol)
    {
      hasFastDelete = Config::getFastDelete();
    }

    if (hasFastDelete)
    {
      ColStructList colStructListAUX(1, colStructList.back());
      WriteEngine::CSCTypesList cscColTypeListAUX(1, cscColTypeList.back());
      ColValueList colValueListAUX(1, colValueList.back());
      std::vector<ExtCPInfo*> currentExtentRangesPtrsAUX(1, currentExtentRangesPtrs.back());

      rc = writeColumnRecUpdate(txnid, cscColTypeListAUX, colStructListAUX, colValueListAUX, colOldValueList,
                                ridLists[extent], tableOid, true, ridLists[extent].size(),
                                &currentExtentRangesPtrsAUX, hasAUXCol);

      for (auto& cpInfoPtr : currentExtentRangesPtrs)
      {
        if (cpInfoPtr)
        {
          cpInfoPtr->toInvalid();
        }
      }
    }
    else
    {
      rc = writeColumnRecUpdate(txnid, cscColTypeList, colStructList, colValueList, colOldValueList,
                                ridLists[extent], tableOid, true, ridLists[extent].size(),
                                &currentExtentRangesPtrs, hasAUXCol);
    }

    if (rc != NO_ERROR)
      break;

    // copy updated ranges into bulk update vector.
    for (auto cpInfoPtr : currentExtentRangesPtrs)
    {
      if (cpInfoPtr)
      {
        cpInfoPtr->fCPInfo.seqNum++;
        infosToUpdate.push_back(*cpInfoPtr);
      }
    }
  }
  markTxnExtentsAsInvalid(txnid);
  if (rc == NO_ERROR)
  {
    ExtCPInfoList infosToDrop = infosToUpdate;
    for (auto& cpInfo : infosToDrop)
    {
      cpInfo.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;
    }
    // ZZZZ
    rc = BRMWrapper::getInstance()->setExtentsMaxMin(infosToDrop);
    setInvalidCPInfosSpecialMarks(infosToUpdate);
    rc = BRMWrapper::getInstance()->setExtentsMaxMin(infosToUpdate);
  }
  return rc;
}

int WriteEngineWrapper::updateColumnRecs(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                         vector<ColStruct>& colExtentsStruct, ColValueList& colValueList,
                                         const RIDList& ridLists, const int32_t tableOid)
{
  // Mark extents invalid
  ColumnOp* colOp = NULL;
  bool successFlag = true;
  unsigned width = 0;
  int curFbo = 0, curBio, lastFbo = -1;
  RID aRid = ridLists[0];
  int rc = 0;
  ExtCPInfoList infosToUpdate;
  for (const auto& colStruct : colExtentsStruct)
  {
    infosToUpdate.push_back(ExtCPInfo(colStruct.colDataType, colStruct.colWidth));
  }
  ExtCPInfoList bulkUpdateInfos;
  std::vector<ExtCPInfo*> pointersToInfos;  // pointersToInfos[i] points to infosToUpdate[i] and may be NULL.

  m_opType = UPDATE;

  for (unsigned j = 0; j < colExtentsStruct.size(); j++)
  {
    colOp = m_colOp[op(colExtentsStruct[j].fCompressionType)];

    ExtCPInfo* cpInfoP = &(infosToUpdate[j]);
    cpInfoP = getCPInfoToUpdateForUpdatableType(colExtentsStruct[j], cpInfoP, m_opType);
    pointersToInfos.push_back(cpInfoP);

    width = colOp->getCorrectRowWidth(colExtentsStruct[j].colDataType, colExtentsStruct[j].colWidth);
    successFlag = colOp->calculateRowId(aRid, BYTE_PER_BLOCK / width, width, curFbo, curBio);

    if (successFlag)
    {
      if (curFbo != lastFbo)
      {
        RETURN_ON_ERROR(AddLBIDtoList(txnid, colExtentsStruct[j], curFbo, cpInfoP));
      }
    }
  }

  markTxnExtentsAsInvalid(txnid);

  if (m_opType != DELETE)
    m_opType = UPDATE;

  for (auto cpInfoP : pointersToInfos)
  {
    if (cpInfoP)
    {
      auto tmp = *cpInfoP;
      tmp.fCPInfo.seqNum = SEQNUM_MARK_INVALID_SET_RANGE;
      bulkUpdateInfos.push_back(tmp);
    }
  }
  if (!bulkUpdateInfos.empty())
  {
    rc = BRMWrapper::getInstance()->setExtentsMaxMin(bulkUpdateInfos);
  }

  rc = writeColumnRecords(txnid, cscColTypeList, colExtentsStruct, colValueList, ridLists, tableOid, true,
                          &pointersToInfos);

  if (rc == NO_ERROR)
  {
    bulkUpdateInfos.clear();
    for (auto cpInfoP : pointersToInfos)
    {
      if (cpInfoP)
      {
        cpInfoP->fCPInfo.seqNum++;
        bulkUpdateInfos.push_back(*cpInfoP);
      }
    }
    if (!bulkUpdateInfos.empty())
    {
      setInvalidCPInfosSpecialMarks(bulkUpdateInfos);
      rc = BRMWrapper::getInstance()->setExtentsMaxMin(bulkUpdateInfos);
    }
  }
  m_opType = NOOP;
  return rc;
}

int WriteEngineWrapper::writeColumnRecords(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                           vector<ColStruct>& colStructList, ColValueList& colValueList,
                                           const RIDList& ridLists, const int32_t tableOid, bool versioning,
                                           std::vector<ExtCPInfo*>* cpInfos)
{
  bool bExcp;
  int rc = 0;
  void* valArray = NULL;
  void* oldValArray = NULL;
  Column curCol;
  ColStruct curColStruct;
  CalpontSystemCatalog::ColType curColType;
  ColTupleList curTupleList;
  ColStructList::size_type totalColumn;
  ColStructList::size_type i;
  ColTupleList::size_type totalRow;
  setTransId(txnid);
  totalColumn = colStructList.size();
  totalRow = ridLists.size();

  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);

  for (i = 0; i < totalColumn; i++)
  {
    ExtCPInfo* cpInfo = NULL;
    if (cpInfos)
    {
      cpInfo = (*cpInfos)[i];
    }
    valArray = NULL;
    oldValArray = NULL;

    curColStruct = colStructList[i];
    curColType = cscColTypeList[i];
    curTupleList = colValueList[i];
    ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];

    Convertor::convertColType(&curColStruct);

    // set params
    colOp->initColumn(curCol);

    colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                       curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot,
                       curColStruct.fColPartition, curColStruct.fColSegment);
    colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);
    ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
    ColExtsInfo::iterator it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) &&
          (it->segNum == curColStruct.fColSegment))
        break;

      it++;
    }

    if (it == aColExtsInfo.end())  // add this one to the list
    {
      ColExtInfo aExt;
      aExt.dbRoot = curColStruct.fColDbRoot;
      aExt.partNum = curColStruct.fColPartition;
      aExt.segNum = curColStruct.fColSegment;
      aExt.compType = curColStruct.fCompressionType;
      aExt.isDict = false;
      aColExtsInfo.push_back(aExt);
      aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
    }

    string segFile;
    rc = colOp->openColumnFile(curCol, segFile, true);  // @bug 5572 HDFS tmp file

    if (rc != NO_ERROR)
      break;

    vector<LBIDRange> rangeList;

    if (versioning)
    {
      rc = processVersionBuffers(curCol.dataFile.pFile, txnid, curColStruct, curColStruct.colWidth, totalRow,
                                 ridLists, rangeList);
    }

    if (rc != NO_ERROR)
    {
      if (curColStruct.fCompressionType == 0)
      {
        curCol.dataFile.pFile->flush();
      }

      BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
      break;
    }

    allocateValArray(valArray, totalRow, curColStruct.colType, curColStruct.colWidth);

    if (m_opType != INSERT && cpInfo)
    {
      allocateValArray(oldValArray, totalRow, curColStruct.colType, curColStruct.colWidth);
    }

    // convert values to valArray
    bExcp = false;

    try
    {
      convertValArray(totalRow, cscColTypeList[i], curColStruct.colType, curTupleList, valArray);
    }
    catch (...)
    {
      bExcp = true;
    }

    if (bExcp)
    {
      BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
      return ERR_PARSING;
    }

#ifdef PROFILE
    timer.start("writeRow ");
#endif
    rc = colOp->writeRowsValues(curCol, totalRow, ridLists, valArray, oldValArray);
#ifdef PROFILE
    timer.stop("writeRow ");
#endif
    colOp->clearColumn(curCol);

    updateMaxMinRange(totalRow, totalRow, cscColTypeList[i], curColStruct.colType, valArray, oldValArray,
                      cpInfo, false);

    if (curColStruct.fCompressionType == 0)
    {
      std::vector<BRM::FileInfo> files;
      BRM::FileInfo aFile;
      aFile.partitionNum = curColStruct.fColPartition;
      aFile.dbRoot = curColStruct.fColDbRoot;
      ;
      aFile.segmentNum = curColStruct.fColSegment;
      aFile.compType = curColStruct.fCompressionType;
      files.push_back(aFile);

      if (idbdatafile::IDBPolicy::useHdfs())
        cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());
    }

    BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

    if (valArray != NULL)
      free(valArray);

    if (oldValArray != NULL)
    {
      free(oldValArray);
    }

    // check error
    if (rc != NO_ERROR)
      break;
  }

  return rc;
}

/*@brief writeColumnRec - Write values to a column
 */
/***********************************************************
 * DESCRIPTION:
 *    Write values to a column
 * PARAMETERS:
 *    tableOid - table object id
 *    cscColTypesList - CSC ColType list
 *    colStructList - column struct list
 *    colValueList - column value list
 *    colNewStructList - the new extent struct list
 *    colNewValueList - column value list for the new extent
 *    rowIdArray -  row id list
 *    useTmpSuffix - use temp suffix for db output file
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in inserting the value
 ***********************************************************/
int WriteEngineWrapper::writeColumnRec(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                       const ColStructList& colStructList, ColValueList& colValueList,
                                       RID* rowIdArray, const ColStructList& newColStructList,
                                       ColValueList& newColValueList, const int32_t tableOid,
                                       bool useTmpSuffix, bool versioning, ColSplitMaxMinInfoList* maxMins)
{
  bool bExcp;
  int rc = 0;
  void* valArray;
  void* oldValArray;
  string segFile;
  Column curCol;
  ColTupleList oldTupleList;
  ColStructList::size_type totalColumn;
  ColStructList::size_type i;
  ColTupleList::size_type totalRow1, totalRow2;

  setTransId(txnid);

  totalColumn = colStructList.size();
#ifdef PROFILE
  StopWatch timer;
#endif

  totalRow1 = colValueList[0].size();
  totalRow2 = 0;

  if (newColValueList.size() > 0)
    totalRow2 = newColValueList[0].size();

  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);

  for (i = 0; i < totalColumn; i++)
  {
    if (totalRow2 > 0)
    {
      RID* secondPart = rowIdArray + totalRow1;

      //@Bug 2205 Check if all rows go to the new extent
      if (totalRow1 > 0)
      {
        // Write the first batch
        valArray = NULL;
        oldValArray = NULL;
        RID* firstPart = rowIdArray;
        ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];

        // set params
        colOp->initColumn(curCol);
        // need to pass real dbRoot, partition, and segment to setColParam
        colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
                           colStructList[i].colType, colStructList[i].dataOid,
                           colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
                           colStructList[i].fColPartition, colStructList[i].fColSegment);
        colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType);

        ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
        ColExtsInfo::iterator it = aColExtsInfo.begin();

        while (it != aColExtsInfo.end())
        {
          if ((it->dbRoot == colStructList[i].fColDbRoot) &&
              (it->partNum == colStructList[i].fColPartition) && (it->segNum == colStructList[i].fColSegment))
            break;

          it++;
        }

        if (it == aColExtsInfo.end())  // add this one to the list
        {
          ColExtInfo aExt;
          aExt.dbRoot = colStructList[i].fColDbRoot;
          aExt.partNum = colStructList[i].fColPartition;
          aExt.segNum = colStructList[i].fColSegment;
          aExt.compType = colStructList[i].fCompressionType;
          aColExtsInfo.push_back(aExt);
          aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
        }

        rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

        if (rc != NO_ERROR)
          break;

        // handling versioning
        vector<LBIDRange> rangeList;

        if (versioning)
        {
          rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth,
                                    totalRow1, firstPart, rangeList);

          if (rc != NO_ERROR)
          {
            if (colStructList[i].fCompressionType == 0)
            {
              curCol.dataFile.pFile->flush();
            }

            BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
            break;
          }
        }

        // WIP We can allocate based on column size and not colType
        // have to init the size here
        allocateValArray(valArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth);

        ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType(
            colStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[0] : NULL, m_opType);

        if (m_opType != INSERT && cpInfo != NULL)  // we allocate space for old values only when we need them.
        {
          allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth);
        }
        else
        {
          oldValArray = NULL;
        }

        // convert values to valArray
        // WIP Is m_opType ever set to DELETE?
        if (m_opType != DELETE)
        {
          bExcp = false;

          try
          {
            // WIP We convert values twice!?
            // dmlcommandproc converts strings to boost::any and this converts
            // into actual type value masked by *void
            // It is not clear why we need to convert to boost::any b/c we can convert from the original
            // string here
            convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i],
                            valArray);
          }
          catch (...)
          {
            bExcp = true;
          }

          if (bExcp)
          {
            if (versioning)
              BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

            return ERR_PARSING;
          }

#ifdef PROFILE
          timer.start("writeRow ");
#endif
          rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray, oldValArray);
#ifdef PROFILE
          timer.stop("writeRow ");
#endif
        }
        else
        {
#ifdef PROFILE
          timer.start("writeRow ");
#endif
          rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray, true);
#ifdef PROFILE
          timer.stop("writeRow ");
#endif
        }

        colOp->clearColumn(curCol);

        updateMaxMinRange(totalRow1, totalRow1, cscColTypeList[i], colStructList[i].colType, valArray,
                          oldValArray, cpInfo, rowIdArray[0] == 0 && m_opType == INSERT);

        if (versioning)
          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

        if (valArray != NULL)
          free(valArray);

        if (oldValArray != NULL)
          free(oldValArray);

        // check error
        if (rc != NO_ERROR)
          break;
      }

      // Process the second batch
      valArray = NULL;
      oldValArray = NULL;

      ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];

      // set params
      colOp->initColumn(curCol);
      colOp->setColParam(curCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType,
                         newColStructList[i].colType, newColStructList[i].dataOid,
                         newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
                         newColStructList[i].fColPartition, newColStructList[i].fColSegment);
      colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType);

      ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == newColStructList[i].fColDbRoot) &&
            (it->partNum == newColStructList[i].fColPartition) &&
            (it->segNum == newColStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = newColStructList[i].fColDbRoot;
        aExt.partNum = newColStructList[i].fColPartition;
        aExt.segNum = newColStructList[i].fColSegment;
        aExt.compType = newColStructList[i].fCompressionType;
        aColExtsInfo.push_back(aExt);
        aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
      }

      // Pass "false" for hdfs tmp file flag.  Since we only allow 1
      // extent per segment file (with HDFS), we can assume a second
      // extent is going to a new file (and won't need tmp file).
      rc = colOp->openColumnFile(curCol, segFile, false, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
        break;

      // handling versioning
      vector<LBIDRange> rangeList;

      if (versioning)
      {
        rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
                                  newColStructList[i].colWidth, totalRow2, secondPart, rangeList);

        if (rc != NO_ERROR)
        {
          if (newColStructList[i].fCompressionType == 0)
          {
            curCol.dataFile.pFile->flush();
          }

          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
          break;
        }
      }

      ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType(
          newColStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[1] : NULL, m_opType);
      allocateValArray(valArray, totalRow2, newColStructList[i].colType, newColStructList[i].colWidth);

      if (m_opType != INSERT && cpInfo != NULL)  // we allocate space for old values only when we need them.
      {
        allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth);
      }
      else
      {
        oldValArray = NULL;
      }

      // convert values to valArray
      if (m_opType != DELETE)
      {
        bExcp = false;

        try
        {
          convertValArray(totalRow2, cscColTypeList[i], newColStructList[i].colType, newColValueList[i],
                          valArray);
        }
        catch (...)
        {
          bExcp = true;
        }

        if (bExcp)
        {
          if (versioning)
            BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

          return ERR_PARSING;
        }

#ifdef PROFILE
        timer.start("writeRow ");
#endif
        rc = colOp->writeRow(
            curCol, totalRow2, secondPart, valArray,
            oldValArray);  // XXX: here we use secondPart array and just below we use rowIdArray. WHY???
#ifdef PROFILE
        timer.stop("writeRow ");
#endif
      }
      else
      {
#ifdef PROFILE
        timer.start("writeRow ");
#endif
        rc = colOp->writeRow(
            curCol, totalRow2, rowIdArray, valArray, oldValArray,
            true);  // XXX: BUG: here we use rowIdArray and just above we use secondPart array. WHY???
#ifdef PROFILE
        timer.stop("writeRow ");
#endif
      }

      colOp->clearColumn(curCol);

      updateMaxMinRange(totalRow2, totalRow2, cscColTypeList[i], newColStructList[i].colType, valArray,
                        oldValArray, cpInfo, secondPart[0] == 0 && m_opType == INSERT);

      if (versioning)
        BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

      if (valArray != NULL)
        free(valArray);

      // check error
      if (rc != NO_ERROR)
        break;
    }
    else
    {
      valArray = NULL;
      oldValArray = NULL;

      ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];

      ExtCPInfo* cpInfo = getCPInfoToUpdateForUpdatableType(
          colStructList[i], maxMins ? ((*maxMins)[i]).fSplitMaxMinInfoPtrs[0] : NULL, m_opType);

      // set params
      colOp->initColumn(curCol);
      colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
                         colStructList[i].colType, colStructList[i].dataOid,
                         colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
                         colStructList[i].fColPartition, colStructList[i].fColSegment);
      colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType);

      rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

      // cout << " Opened file oid " << curCol.dataFile.pFile << endl;
      if (rc != NO_ERROR)
        break;

      ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
            (it->segNum == colStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = colStructList[i].fColDbRoot;
        aExt.partNum = colStructList[i].fColPartition;
        aExt.segNum = colStructList[i].fColSegment;
        aExt.compType = colStructList[i].fCompressionType;
        aColExtsInfo.push_back(aExt);
        aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
      }

      // handling versioning
      vector<LBIDRange> rangeList;

      if (versioning)
      {
        rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth,
                                  totalRow1, rowIdArray, rangeList);

        if (rc != NO_ERROR)
        {
          if (colStructList[i].fCompressionType == 0)
          {
            curCol.dataFile.pFile->flush();
          }

          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
          break;
        }
      }

      allocateValArray(valArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth);

      if (m_opType != INSERT && cpInfo != NULL)  // we allocate space for old values only when we need them.
      {
        allocateValArray(oldValArray, totalRow1, colStructList[i].colType, colStructList[i].colWidth);
      }
      else
      {
        oldValArray = NULL;
      }

      // convert values to valArray
      if (m_opType != DELETE)
      {
        bExcp = false;

        try
        {
          convertValArray(totalRow1, cscColTypeList[i], colStructList[i].colType, colValueList[i], valArray,
                          true);
        }
        catch (...)
        {
          bExcp = true;
        }

        if (bExcp)
        {
          if (versioning)
            BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

          return ERR_PARSING;
        }

#ifdef PROFILE
        timer.start("writeRow ");
#endif
        rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray);
#ifdef PROFILE
        timer.stop("writeRow ");
#endif
      }
      else
      {
#ifdef PROFILE
        timer.start("writeRow ");
#endif
        rc = colOp->writeRow(curCol, totalRow1, rowIdArray, valArray, oldValArray, true);
#ifdef PROFILE
        timer.stop("writeRow ");
#endif
      }

      colOp->clearColumn(curCol);

      updateMaxMinRange(totalRow1, totalRow1, cscColTypeList[i], colStructList[i].colType, valArray,
                        oldValArray, cpInfo, rowIdArray[0] == 0 && m_opType == INSERT);

      if (versioning)
        BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

      if (valArray != NULL)
        free(valArray);

      if (oldValArray != NULL)
        free(oldValArray);

      // check error
      if (rc != NO_ERROR)
        break;
    }
  }  // end of for (i = 0

#ifdef PROFILE
  timer.finish();
#endif
  return rc;
}

int WriteEngineWrapper::writeColumnRecBinary(const TxnID& txnid, const ColStructList& colStructList,
                                             std::vector<uint64_t>& colValueList, RID* rowIdArray,
                                             const ColStructList& newColStructList,
                                             std::vector<uint64_t>& newColValueList, const int32_t tableOid,
                                             bool useTmpSuffix, bool versioning)
{
  int rc = 0;
  void* valArray = NULL;
  string segFile;
  Column curCol;
  ColStructList::size_type totalColumn;
  ColStructList::size_type i;
  size_t totalRow1, totalRow2;

  setTransId(txnid);

  totalColumn = colStructList.size();
#ifdef PROFILE
  StopWatch timer;
#endif

  totalRow1 = colValueList.size() / totalColumn;

  if (newColValueList.size() > 0)
  {
    totalRow2 = newColValueList.size() / newColStructList.size();
    totalRow1 -= totalRow2;
  }
  else
  {
    totalRow2 = 0;
  }

  // It is possible totalRow1 is zero but totalRow2 has values
  if ((totalRow1 == 0) && (totalRow2 == 0))
    return rc;

  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);

  if (totalRow1)
  {
    valArray = malloc(sizeof(uint64_t) * totalRow1);

    for (i = 0; i < totalColumn; i++)
    {
      //@Bug 2205 Check if all rows go to the new extent
      // Write the first batch
      RID* firstPart = rowIdArray;
      ColumnOp* colOp = m_colOp[op(colStructList[i].fCompressionType)];

      // set params
      colOp->initColumn(curCol);
      // need to pass real dbRoot, partition, and segment to setColParam
      colOp->setColParam(curCol, 0, colStructList[i].colWidth, colStructList[i].colDataType,
                         colStructList[i].colType, colStructList[i].dataOid,
                         colStructList[i].fCompressionType, colStructList[i].fColDbRoot,
                         colStructList[i].fColPartition, colStructList[i].fColSegment);
      colOp->findTypeHandler(colStructList[i].colWidth, colStructList[i].colDataType);

      ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(colStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == colStructList[i].fColDbRoot) && (it->partNum == colStructList[i].fColPartition) &&
            (it->segNum == colStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = colStructList[i].fColDbRoot;
        aExt.partNum = colStructList[i].fColPartition;
        aExt.segNum = colStructList[i].fColSegment;
        aExt.compType = colStructList[i].fCompressionType;
        aColExtsInfo.push_back(aExt);
        aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
      }

      rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
        break;

      // handling versioning
      vector<LBIDRange> rangeList;

      if (versioning)
      {
        rc = processVersionBuffer(curCol.dataFile.pFile, txnid, colStructList[i], colStructList[i].colWidth,
                                  totalRow1, firstPart, rangeList);

        if (rc != NO_ERROR)
        {
          if (colStructList[i].fCompressionType == 0)
          {
            curCol.dataFile.pFile->flush();
          }

          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
          break;
        }
      }

      // totalRow1 -= totalRow2;
      // have to init the size here
      // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
      uint8_t tmp8;
      uint16_t tmp16;
      uint32_t tmp32;

      for (size_t j = 0; j < totalRow1; j++)
      {
        uint64_t curValue = colValueList[((totalRow1 + totalRow2) * i) + j];

        switch (colStructList[i].colType)
        {
          case WriteEngine::WR_VARBINARY:  // treat same as char for now
          case WriteEngine::WR_CHAR:
          case WriteEngine::WR_BLOB:
          case WriteEngine::WR_TEXT: ((uint64_t*)valArray)[j] = curValue; break;

          case WriteEngine::WR_INT:
          case WriteEngine::WR_UINT:
          case WriteEngine::WR_MEDINT:
          case WriteEngine::WR_UMEDINT:
          case WriteEngine::WR_FLOAT:
            tmp32 = curValue;
            ((uint32_t*)valArray)[j] = tmp32;
            break;

          case WriteEngine::WR_ULONGLONG:
          case WriteEngine::WR_LONGLONG:
          case WriteEngine::WR_DOUBLE:
          case WriteEngine::WR_TOKEN: ((uint64_t*)valArray)[j] = curValue; break;

          case WriteEngine::WR_BYTE:
          case WriteEngine::WR_UBYTE:
            tmp8 = curValue;
            ((uint8_t*)valArray)[j] = tmp8;
            break;

          case WriteEngine::WR_SHORT:
          case WriteEngine::WR_USHORT:
            tmp16 = curValue;
            ((uint16_t*)valArray)[j] = tmp16;
            break;

          case WriteEngine::WR_BINARY: ((uint64_t*)valArray)[j] = curValue; break;
        }
      }

#ifdef PROFILE
      timer.start("writeRow ");
#endif
      rc = colOp->writeRow(curCol, totalRow1, firstPart, valArray);
#ifdef PROFILE
      timer.stop("writeRow ");
#endif
      colOp->closeColumnFile(curCol);

      if (versioning)
        BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

      // check error
      if (rc != NO_ERROR)
        break;

    }  // end of for (i = 0

    if (valArray != NULL)
    {
      free(valArray);
      valArray = NULL;
    }
  }

  // MCOL-1176 - Write second extent
  if (totalRow2)
  {
    valArray = malloc(sizeof(uint64_t) * totalRow2);

    for (i = 0; i < newColStructList.size(); i++)
    {
      //@Bug 2205 Check if all rows go to the new extent
      // Write the first batch
      RID* secondPart = rowIdArray + totalRow1;
      ColumnOp* colOp = m_colOp[op(newColStructList[i].fCompressionType)];

      // set params
      colOp->initColumn(curCol);
      // need to pass real dbRoot, partition, and segment to setColParam
      colOp->setColParam(curCol, 0, newColStructList[i].colWidth, newColStructList[i].colDataType,
                         newColStructList[i].colType, newColStructList[i].dataOid,
                         newColStructList[i].fCompressionType, newColStructList[i].fColDbRoot,
                         newColStructList[i].fColPartition, newColStructList[i].fColSegment);
      colOp->findTypeHandler(newColStructList[i].colWidth, newColStructList[i].colDataType);

      ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(newColStructList[i].dataOid);
      ColExtsInfo::iterator it = aColExtsInfo.begin();

      while (it != aColExtsInfo.end())
      {
        if ((it->dbRoot == newColStructList[i].fColDbRoot) &&
            (it->partNum == newColStructList[i].fColPartition) &&
            (it->segNum == colStructList[i].fColSegment))
          break;

        it++;
      }

      if (it == aColExtsInfo.end())  // add this one to the list
      {
        ColExtInfo aExt;
        aExt.dbRoot = newColStructList[i].fColDbRoot;
        aExt.partNum = newColStructList[i].fColPartition;
        aExt.segNum = newColStructList[i].fColSegment;
        aExt.compType = newColStructList[i].fCompressionType;
        aColExtsInfo.push_back(aExt);
        aTbaleMetaData->setColExtsInfo(newColStructList[i].dataOid, aColExtsInfo);
      }

      rc = colOp->openColumnFile(curCol, segFile, useTmpSuffix, IO_BUFF_SIZE);  // @bug 5572 HDFS tmp file

      if (rc != NO_ERROR)
        break;

      // handling versioning
      vector<LBIDRange> rangeList;

      if (versioning)
      {
        rc = processVersionBuffer(curCol.dataFile.pFile, txnid, newColStructList[i],
                                  newColStructList[i].colWidth, totalRow2, secondPart, rangeList);

        if (rc != NO_ERROR)
        {
          if (newColStructList[i].fCompressionType == 0)
          {
            curCol.dataFile.pFile->flush();
          }

          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);
          break;
        }
      }

      // totalRow1 -= totalRow2;
      // have to init the size here
      // nullArray = (bool*) malloc(sizeof(bool) * totalRow);
      uint8_t tmp8;
      uint16_t tmp16;
      uint32_t tmp32;

      for (size_t j = 0; j < totalRow2; j++)
      {
        uint64_t curValue = newColValueList[(totalRow2 * i) + j];

        switch (newColStructList[i].colType)
        {
          case WriteEngine::WR_VARBINARY:  // treat same as char for now
          case WriteEngine::WR_CHAR:
          case WriteEngine::WR_BLOB:
          case WriteEngine::WR_TEXT: ((uint64_t*)valArray)[j] = curValue; break;

          case WriteEngine::WR_INT:
          case WriteEngine::WR_UINT:
          case WriteEngine::WR_MEDINT:
          case WriteEngine::WR_UMEDINT:
          case WriteEngine::WR_FLOAT:
            tmp32 = curValue;
            ((uint32_t*)valArray)[j] = tmp32;
            break;

          case WriteEngine::WR_ULONGLONG:
          case WriteEngine::WR_LONGLONG:
          case WriteEngine::WR_DOUBLE:
          case WriteEngine::WR_TOKEN: ((uint64_t*)valArray)[j] = curValue; break;

          case WriteEngine::WR_BYTE:
          case WriteEngine::WR_UBYTE:
            tmp8 = curValue;
            ((uint8_t*)valArray)[j] = tmp8;
            break;

          case WriteEngine::WR_SHORT:
          case WriteEngine::WR_USHORT:
            tmp16 = curValue;
            ((uint16_t*)valArray)[j] = tmp16;
            break;

          case WriteEngine::WR_BINARY: ((uint64_t*)valArray)[j] = curValue; break;
        }
      }

#ifdef PROFILE
      timer.start("writeRow ");
#endif
      rc = colOp->writeRow(curCol, totalRow2, secondPart, valArray);
#ifdef PROFILE
      timer.stop("writeRow ");
#endif
      colOp->closeColumnFile(curCol);

      if (versioning)
        BRMWrapper::getInstance()->writeVBEnd(txnid, rangeList);

      // check error
      if (rc != NO_ERROR)
        break;

    }  // end of for (i = 0
  }

  if (valArray != NULL)
    free(valArray);

#ifdef PROFILE
  timer.finish();
#endif
  return rc;
}

int WriteEngineWrapper::writeColumnRecUpdate(const TxnID& txnid, const CSCTypesList& cscColTypeList,
                                             const ColStructList& colStructList,
                                             const ColValueList& colValueList, vector<void*>& colOldValueList,
                                             const RIDList& ridList, const int32_t tableOid,
                                             bool convertStructFlag, ColTupleList::size_type nRows,
                                             std::vector<ExtCPInfo*>* cpInfos,
                                             bool hasAUXCol)
{
  bool bExcp;
  int rc = 0;
  void* valArray = NULL;
  void* oldValArray = NULL;
  Column curCol;
  ColStruct curColStruct;
  ColTupleList curTupleList, oldTupleList;
  ColStructList::size_type totalColumn;
  ColStructList::size_type i;
  ColTupleList::size_type totalRow;

  setTransId(txnid);
  colOldValueList.clear();
  totalColumn = colStructList.size();
  totalRow = nRows;

#ifdef PROFILE
  StopWatch timer;
#endif

  vector<LBIDRange> rangeListTot;
  std::vector<VBRange> freeList;
  vector<vector<uint32_t> > fboLists;
  vector<vector<LBIDRange> > rangeLists;
  rc = processBeginVBCopy(txnid, ((m_opType == DELETE && hasAUXCol) ? ColStructList(1, colStructList.back()) : colStructList),
                          ridList, freeList, fboLists, rangeLists, rangeListTot);

  if (rc != NO_ERROR)
  {
    if (rangeListTot.size() > 0)
      BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);

    switch (rc)
    {
      case BRM::ERR_DEADLOCK: return ERR_BRM_DEAD_LOCK;

      case BRM::ERR_VBBM_OVERFLOW: return ERR_BRM_VB_OVERFLOW;

      case BRM::ERR_NETWORK: return ERR_BRM_NETWORK;

      case BRM::ERR_READONLY: return ERR_BRM_READONLY;

      default: return ERR_BRM_BEGIN_COPY;
    }
  }

  VBRange aRange;
  uint32_t blocksProcessedThisOid = 0;
  uint32_t blocksProcessed = 0;
  std::vector<BRM::FileInfo> files;
  TableMetaData* aTbaleMetaData = TableMetaData::makeTableMetaData(tableOid);

  for (i = 0; i < totalColumn; i++)
  {
    valArray = NULL;
    oldValArray = NULL;
    curColStruct = colStructList[i];
    curTupleList = colValueList[i];  // same value for all rows
    ColumnOp* colOp = m_colOp[op(curColStruct.fCompressionType)];

    // convert column data type
    if (convertStructFlag)
      Convertor::convertColType(&curColStruct);

    // set params
    colOp->initColumn(curCol);
    colOp->setColParam(curCol, 0, curColStruct.colWidth, curColStruct.colDataType, curColStruct.colType,
                       curColStruct.dataOid, curColStruct.fCompressionType, curColStruct.fColDbRoot,
                       curColStruct.fColPartition, curColStruct.fColSegment);
    colOp->findTypeHandler(curColStruct.colWidth, curColStruct.colDataType);

    ColExtsInfo aColExtsInfo = aTbaleMetaData->getColExtsInfo(curColStruct.dataOid);
    ColExtsInfo::iterator it = aColExtsInfo.begin();

    while (it != aColExtsInfo.end())
    {
      if ((it->dbRoot == curColStruct.fColDbRoot) && (it->partNum == curColStruct.fColPartition) &&
          (it->segNum == curColStruct.fColSegment))
        break;

      it++;
    }

    if (it == aColExtsInfo.end())  // add this one to the list
    {
      ColExtInfo aExt;
      aExt.dbRoot = curColStruct.fColDbRoot;
      aExt.partNum = curColStruct.fColPartition;
      aExt.segNum = curColStruct.fColSegment;
      aExt.compType = curColStruct.fCompressionType;
      aColExtsInfo.push_back(aExt);
      aTbaleMetaData->setColExtsInfo(colStructList[i].dataOid, aColExtsInfo);
    }

    string segFile;
    bool isReadOnly = (m_opType == DELETE && hasAUXCol && (i != colStructList.size() - 1));

    rc = colOp->openColumnFile(curCol, segFile, true, IO_BUFF_SIZE, isReadOnly);  // @bug 5572 HDFS tmp file

    if (rc != NO_ERROR)
      break;

    if (curColStruct.fCompressionType == 0)
    {
      BRM::FileInfo aFile;
      aFile.oid = curColStruct.dataOid;
      aFile.partitionNum = curColStruct.fColPartition;
      aFile.dbRoot = curColStruct.fColDbRoot;
      aFile.segmentNum = curColStruct.fColSegment;
      aFile.compType = curColStruct.fCompressionType;
      files.push_back(aFile);
    }

    // handling versioning
    std::vector<VBRange> curFreeList;
    uint32_t blockUsed = 0;

    if (!idbdatafile::IDBPolicy::useHdfs())
    {
      if (rangeListTot.size() > 0 &&
          (m_opType != DELETE || !hasAUXCol || (i == colStructList.size() - 1)))
      {
        ColStructList::size_type j = i;

        if (m_opType == DELETE && hasAUXCol && (i == colStructList.size() - 1))
          j = 0;

        if (freeList[0].size >= (blocksProcessed + rangeLists[j].size()))
        {
          aRange.vbOID = freeList[0].vbOID;
          aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
          aRange.size = rangeLists[j].size();
          curFreeList.push_back(aRange);
        }
        else
        {
          aRange.vbOID = freeList[0].vbOID;
          aRange.vbFBO = freeList[0].vbFBO + blocksProcessed;
          aRange.size = freeList[0].size - blocksProcessed;
          blockUsed = aRange.size;
          curFreeList.push_back(aRange);

          if (freeList.size() > 1)
          {
            aRange.vbOID = freeList[1].vbOID;
            aRange.vbFBO = freeList[1].vbFBO + blocksProcessedThisOid;
            aRange.size = rangeLists[j].size() - blockUsed;
            curFreeList.push_back(aRange);
            blocksProcessedThisOid += aRange.size;
          }
          else
          {
            rc = 1;
            break;
          }
        }

        blocksProcessed += rangeLists[j].size();

        rc = BRMWrapper::getInstance()->writeVB(curCol.dataFile.pFile, (BRM::VER_t)txnid,
                                                curColStruct.dataOid, fboLists[j], rangeLists[j], colOp,
                                                curFreeList, curColStruct.fColDbRoot, true);
      }
    }

    if (rc != NO_ERROR)
    {
      if (curColStruct.fCompressionType == 0)
      {
        curCol.dataFile.pFile->flush();
      }

      if (rangeListTot.size() > 0)
        BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);

      break;
    }

    allocateValArray(valArray, 1, curColStruct.colType, curColStruct.colWidth);

    ExtCPInfo* cpInfo;
    cpInfo = cpInfos ? ((*cpInfos)[i]) : NULL;

    if (cpInfo)
    {
      allocateValArray(oldValArray, totalRow, curColStruct.colType, curColStruct.colWidth);
    }

    // convert values to valArray
    if (m_opType != DELETE)
    {
      bExcp = false;
      ColTuple curTuple;
      curTuple = curTupleList[0];

      try
      {
        convertValue(cscColTypeList[i], curColStruct.colType, valArray, curTuple.data);
      }
      catch (...)
      {
        bExcp = true;
      }

      if (bExcp)
      {
        if (rangeListTot.size() > 0)
          BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);

        return ERR_PARSING;
      }

#ifdef PROFILE
      timer.start("writeRow ");
#endif
      rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray);
#ifdef PROFILE
      timer.stop("writeRow ");
#endif
    }
    else
    {
#ifdef PROFILE
      timer.start("writeRows ");
#endif
      if (!isReadOnly)
        rc = colOp->writeRows(curCol, totalRow, ridList, valArray, oldValArray, true);
      else
        rc = colOp->writeRowsReadOnly(curCol, totalRow, ridList, oldValArray);
#ifdef PROFILE
      timer.stop("writeRows ");
#endif
    }

    updateMaxMinRange(1, totalRow, cscColTypeList[i], curColStruct.colType,
                      m_opType == DELETE ? NULL : valArray, oldValArray, cpInfo, false);

    colOp->clearColumn(curCol, !isReadOnly);

    if (valArray != NULL)
    {
      free(valArray);
      valArray = NULL;
    }

    if (oldValArray != NULL)
    {
      free(oldValArray);
      oldValArray = NULL;
    }

    // check error
    if (rc != NO_ERROR)
      break;

  }  // end of for (i = 0)

  if ((idbdatafile::IDBPolicy::useHdfs()) && (files.size() > 0))
    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

  if (rangeListTot.size() > 0)
    BRMWrapper::getInstance()->writeVBEnd(txnid, rangeListTot);

  // timer.stop("Delete:writecolrec");
#ifdef PROFILE
  timer.finish();
#endif
  return rc;
}

/*@brief tokenize - return a token for a given signature and size
 */
/***********************************************************
 * DESCRIPTION:
 *  return a token for a given signature and size
 *  If it is not in the dictionary, the signature
 *  will be added to the dictionary and the index tree
 *  If it is already in the dictionary, then
 *  the token will be returned
 *  This function does not open and close files.
 *  users need to use openDctnry and CloseDctnry
 * PARAMETERS:
 *  DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in inserting the value
 ***********************************************************/
int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryTuple& dctnryTuple, int ct)
{
  int cop = op(ct);
  m_dctnry[cop]->setTransId(txnid);
  return m_dctnry[cop]->updateDctnry(dctnryTuple.sigValue, dctnryTuple.sigSize, dctnryTuple.token);
}

/*@brief tokenize - return a token for a given signature and size
 *                          accept OIDs as input
 */
/***********************************************************
 * DESCRIPTION:
 *  Token for a given signature and size
 *  If it is not in the dictionary, the signature
 *  will be added to the dictionary and the index tree
 *  If it is already in the dictionary, then
 *  the token will be returned
 * PARAMETERS:
 *  DctnryTuple& dctnryTuple - holds the sigValue, sigSize and token
 *  DctnryStruct dctnryStruct- contain the 3 OID for dictionary,
 *                             tree and list.
 * RETURN:
 *    NO_ERROR if success
 *    others if something wrong in inserting the value
 ***********************************************************/
int WriteEngineWrapper::tokenize(const TxnID& txnid, DctnryStruct& dctnryStruct, DctnryTuple& dctnryTuple,
                                 bool useTmpSuffix)  // @bug 5572 HDFS tmp file
{
  // find the corresponding column segment file the token is going to be inserted.

  Dctnry* dctnry = m_dctnry[op(dctnryStruct.fCompressionType)];
  int rc = dctnry->openDctnry(dctnryStruct.dctnryOid, dctnryStruct.fColDbRoot, dctnryStruct.fColPartition,
                              dctnryStruct.fColSegment,
                              useTmpSuffix);  // @bug 5572 TBD

  if (rc != NO_ERROR)
    return rc;

  rc = tokenize(txnid, dctnryTuple, dctnryStruct.fCompressionType);

  int rc2 = dctnry->closeDctnry(true);  // close file, even if tokenize() fails

  if ((rc == NO_ERROR) && (rc2 != NO_ERROR))
    rc = rc2;

  return rc;
}

/***********************************************************
 * DESCRIPTION:
 *    Create column files, including data and bitmap files
 * PARAMETERS:
 *    dataOid - column data file id
 *    bitmapOid - column bitmap file id
 *    colWidth - column width
 *    dbRoot   - DBRoot where file is to be located
 *    partition - Starting partition number for segment file path
 *     segment - segment number
 *     compressionType - compression type
 * RETURN:
 *    NO_ERROR if success
 *    ERR_FILE_EXIST if file exists
 *    ERR_FILE_CREATE if something wrong in creating the file
 ***********************************************************/
int WriteEngineWrapper::createDctnry(const TxnID& txnid, const OID& dctnryOid, int colWidth, uint16_t dbRoot,
                                     uint32_t partiotion, uint16_t segment, int compressionType)
{
  BRM::LBID_t startLbid;
  return m_dctnry[op(compressionType)]->createDctnry(dctnryOid, colWidth, dbRoot, partiotion, segment,
                                                     startLbid);
}

int WriteEngineWrapper::convertRidToColumn(RID& rid, uint16_t& dbRoot, uint32_t& partition, uint16_t& segment,
                                           RID filesPerColumnPartition, RID extentsPerSegmentFile,
                                           RID extentRows, uint16_t startDBRoot, unsigned dbrootCnt)
{
  int rc = 0;
  partition = rid / (filesPerColumnPartition * extentsPerSegmentFile * extentRows);

  segment = (((rid % (filesPerColumnPartition * extentsPerSegmentFile * extentRows)) / extentRows)) %
            filesPerColumnPartition;

  dbRoot = ((startDBRoot - 1 + segment) % dbrootCnt) + 1;

  // Calculate the relative rid for this segment file
  RID relRidInPartition =
      rid - ((RID)partition * (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
  assert(relRidInPartition <= (RID)filesPerColumnPartition * (RID)extentsPerSegmentFile * (RID)extentRows);
  uint32_t numExtentsInThisPart = relRidInPartition / extentRows;
  unsigned numExtentsInThisSegPart = numExtentsInThisPart / filesPerColumnPartition;
  RID relRidInThisExtent = relRidInPartition - numExtentsInThisPart * extentRows;
  rid = relRidInThisExtent + numExtentsInThisSegPart * extentRows;
  return rc;
}

/***********************************************************
 * DESCRIPTION:
 *    Clears table lock for the specified table lock ID.
 * PARAMETERS:
 *    lockID - table lock to be released
 *    errMsg - if error occurs, this is the return error message
 * RETURN:
 *    NO_ERROR if operation is successful
 ***********************************************************/
int WriteEngineWrapper::clearTableLockOnly(uint64_t lockID, std::string& errMsg)
{
  bool bReleased;

  int rc = BRMWrapper::getInstance()->releaseTableLock(lockID, bReleased, errMsg);

  return rc;
}

/***********************************************************
 * DESCRIPTION:
 *    Rolls back the state of the extentmap and database files for the
 *    specified table OID, using the metadata previously saved to disk.
 *    Also clears the table lock for the specified table OID.
 * PARAMETERS:
 *    tableOid - table OID to be rolled back
 *    lockID   - table lock corresponding to tableOid
 *    tableName - table name associated with tableOid
 *    applName - application that is driving this bulk rollback
 *    debugConsole - enable debug logging to the console
 *    errorMsg - error message explaining any rollback failure
 * RETURN:
 *    NO_ERROR if rollback completed succesfully
 ***********************************************************/
int WriteEngineWrapper::bulkRollback(OID tableOid, uint64_t lockID, const std::string& tableName,
                                     const std::string& applName, bool debugConsole, string& errorMsg)
{
  errorMsg.clear();

  BulkRollbackMgr rollbackMgr(tableOid, lockID, tableName, applName);

  if (debugConsole)
    rollbackMgr.setDebugConsole(true);

  // We used to pass "false" to not keep (delete) the metafiles at the end of
  // the rollback.  But after the transition to sharedNothing, we pass "true"
  // to initially keep these files.  The metafiles are deleted later, only
  // after all the distributed bulk rollbacks are successfully completed.
  int rc = rollbackMgr.rollback(true);

  if (rc != NO_ERROR)
    errorMsg = rollbackMgr.getErrorMsg();

  // Ignore the return code for now; more important to base rc on the
  // success or failure of the previous work
  BRMWrapper::getInstance()->takeSnapshot();

  return rc;
}

int WriteEngineWrapper::rollbackCommon(const TxnID& txnid, int sessionId)
{
  // Remove the unwanted tmp files and recover compressed chunks.
  string prefix;

  // BUG 4312
  RemoveTxnFromLBIDMap(txnid);
  RemoveTxnFromDictMap(txnid);

  config::Config* config = config::Config::makeConfig();
  prefix = config->getConfig("SystemConfig", "DBRMRoot");

  if (prefix.length() == 0)
  {
    cerr << "Need a valid DBRMRoot entry in Calpont configuation file";
    return ERR_INVALID_PARAM;
  }

  uint64_t pos = prefix.find_last_of("/");
  std::string aDMLLogFileName;

  if (pos != string::npos)
  {
    aDMLLogFileName = prefix.substr(0, pos + 1);  // Get the file path
  }
  else
  {
    logging::Message::Args args;
    args.add("RollbackTran cannot find the dbrm directory for the DML log file");
    SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_CRITICAL, logging::M0007);
    return ERR_OPEN_DML_LOG;
  }

  std::ostringstream oss;
  oss << txnid << "_" << Config::getLocalModuleID();
  aDMLLogFileName += "DMLLog_" + oss.str();

  if (IDBPolicy::exists(aDMLLogFileName.c_str()))
  {
    // TODO-for now the DML log file will always be in a local
    // filesystem since IDBDataFile doesn't have any support for
    // a cpp iostream interface.  need to decide if this is ok.
    boost::scoped_ptr<IDBDataFile> aDMLLogFile(IDBDataFile::open(
        IDBPolicy::getType(aDMLLogFileName.c_str(), IDBPolicy::WRITEENG), aDMLLogFileName.c_str(), "r", 0));

    if (aDMLLogFile)  // need recover
    {
      ssize_t fileSize = aDMLLogFile->size();
      boost::scoped_array<char> buf(new char[fileSize]);

      if (aDMLLogFile->read(buf.get(), fileSize) != fileSize)
        return ERR_FILE_READ;

      std::istringstream strstream(string(buf.get(), fileSize));
      std::string backUpFileType;
      std::string filename;
      int64_t size;
      int64_t offset;

      while (strstream >> backUpFileType >> filename >> size >> offset)
      {
        // cout << "Found: " <<  backUpFileType << " name " << filename << "size: " << size << " offset: " <<
        // offset << endl;
        std::ostringstream oss;
        oss << "RollbackTran found " << backUpFileType << " name " << filename << " size: " << size
            << " offset: " << offset;
        logging::Message::Args args;
        args.add(oss.str());
        SimpleSysLog::instance()->logMsg(args, logging::LOG_TYPE_INFO, logging::M0007);

        if (backUpFileType.compare("rlc") == 0)
        {
          // remove the rlc file
          filename += ".rlc";
          // cout << " File removed: " << filename << endl;
          IDBPolicy::remove(filename.c_str());
          logging::Message::Args args1;
          args1.add(filename);
          args1.add(" is removed.");
          SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007);
        }
        else if (backUpFileType.compare("tmp") == 0)
        {
          int rc = NO_ERROR;
          string orig(filename + ".orig");

          // restore the orig file
          if (IDBPolicy::exists(orig.c_str()))
          {
            // not likely both cdf and tmp exist
            if (IDBPolicy::exists(filename.c_str()) && IDBPolicy::remove(filename.c_str()) != 0)
              rc = ERR_COMP_REMOVE_FILE;

            if (rc == NO_ERROR && IDBPolicy::rename(orig.c_str(), filename.c_str()) != 0)
              rc = ERR_COMP_RENAME_FILE;
          }

          // remove the tmp file
          string tmp(filename + ".tmp");

          if (rc == NO_ERROR && IDBPolicy::exists(tmp.c_str()) && IDBPolicy::remove(tmp.c_str()) != 0)
            rc = ERR_COMP_REMOVE_FILE;

          // remove the chunk shifting helper
          string rlc(filename + ".rlc");

          if (rc == NO_ERROR && IDBPolicy::exists(rlc.c_str()) && IDBPolicy::remove(rlc.c_str()) != 0)
            rc = ERR_COMP_REMOVE_FILE;

          logging::Message::Args args1;
          args1.add(filename);

          if (rc == NO_ERROR)
          {
            args1.add(" is restored.");
            SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_INFO, logging::M0007);
          }
          else
          {
            args1.add(" may not restored: ");
            args1.add(rc);
            SimpleSysLog::instance()->logMsg(args1, logging::LOG_TYPE_CRITICAL, logging::M0007);

            return rc;
          }
        }
        else
        {
          // copy back to the data file
          std::string backFileName(filename);

          if (backUpFileType.compare("chk") == 0)
            backFileName += ".chk";
          else
            backFileName += ".hdr";

          // cout << "Rollback found file " << backFileName << endl;
          IDBDataFile* sourceFile = IDBDataFile::open(
              IDBPolicy::getType(backFileName.c_str(), IDBPolicy::WRITEENG), backFileName.c_str(), "r", 0);
          IDBDataFile* targetFile = IDBDataFile::open(
              IDBPolicy::getType(filename.c_str(), IDBPolicy::WRITEENG), filename.c_str(), "r+", 0);

          size_t byteRead;
          unsigned char* readBuf = new unsigned char[size];
          boost::scoped_array<unsigned char> readBufPtr(readBuf);

          if (sourceFile != NULL)
          {
            int rc = sourceFile->seek(0, 0);

            if (rc)
              return ERR_FILE_SEEK;

            byteRead = sourceFile->read(readBuf, size);

            if ((int)byteRead != size)
            {
              logging::Message::Args args6;
              args6.add("Rollback cannot read backup file ");
              args6.add(backFileName);
              SimpleSysLog::instance()->logMsg(args6, logging::LOG_TYPE_ERROR, logging::M0007);
              return ERR_FILE_READ;
            }
          }
          else
          {
            logging::Message::Args args5;
            args5.add("Rollback cannot open backup file ");
            args5.add(backFileName);
            SimpleSysLog::instance()->logMsg(args5, logging::LOG_TYPE_ERROR, logging::M0007);
            return ERR_FILE_NULL;
          }

          size_t byteWrite;

          if (targetFile != NULL)
          {
            int rc = targetFile->seek(offset, 0);

            if (rc)
              return ERR_FILE_SEEK;

            byteWrite = targetFile->write(readBuf, size);

            if ((int)byteWrite != size)
            {
              logging::Message::Args args3;
              args3.add("Rollback cannot copy to file ");
              args3.add(filename);
              args3.add("from file ");
              args3.add(backFileName);
              SimpleSysLog::instance()->logMsg(args3, logging::LOG_TYPE_ERROR, logging::M0007);

              return ERR_FILE_WRITE;
            }
          }
          else
          {
            logging::Message::Args args4;
            args4.add("Rollback cannot open target file ");
            args4.add(filename);
            SimpleSysLog::instance()->logMsg(args4, logging::LOG_TYPE_ERROR, logging::M0007);
            return ERR_FILE_NULL;
          }

          // cout << "Rollback copied to file " << filename << " from file " << backFileName << endl;

          delete targetFile;
          delete sourceFile;
          IDBPolicy::remove(backFileName.c_str());
          logging::Message::Args arg1;
          arg1.add("Rollback copied to file ");
          arg1.add(filename);
          arg1.add("from file ");
          arg1.add(backFileName);
          SimpleSysLog::instance()->logMsg(arg1, logging::LOG_TYPE_INFO, logging::M0007);
        }
      }
    }

    IDBPolicy::remove(aDMLLogFileName.c_str());
  }

  return 0;
}

int WriteEngineWrapper::rollbackTran(const TxnID& txnid, int sessionId)
{
  if (rollbackCommon(txnid, sessionId) != 0)
    return -1;

  return BRMWrapper::getInstance()->rollBack(txnid, sessionId);
}

int WriteEngineWrapper::rollbackBlocks(const TxnID& txnid, int sessionId)
{
  if (rollbackCommon(txnid, sessionId) != 0)
    return -1;

  return BRMWrapper::getInstance()->rollBackBlocks(txnid, sessionId);
}

int WriteEngineWrapper::rollbackVersion(const TxnID& txnid, int sessionId)
{
  // BUG 4312
  RemoveTxnFromLBIDMap(txnid);
  RemoveTxnFromDictMap(txnid);

  return BRMWrapper::getInstance()->rollBackVersion(txnid, sessionId);
}

int WriteEngineWrapper::updateNextValue(const TxnID txnId, const OID& columnoid, const uint64_t nextVal,
                                        const uint32_t sessionID, const uint16_t dbRoot)
{
  int rc = NO_ERROR;
  boost::shared_ptr<CalpontSystemCatalog> systemCatalogPtr;
  RIDList ridList;
  ColValueList colValueList;
  WriteEngine::ColTupleList colTuples;
  ColStructList colStructList;
  WriteEngine::CSCTypesList cscColTypeList;
  WriteEngine::ColStruct colStruct;
  CalpontSystemCatalog::ColType colType;
  colType.columnOID = colStruct.dataOid = OID_SYSCOLUMN_NEXTVALUE;
  colType.colWidth = colStruct.colWidth = 8;
  colStruct.tokenFlag = false;
  colType.colDataType = colStruct.colDataType = CalpontSystemCatalog::UBIGINT;
  colStruct.fColDbRoot = dbRoot;

  m_opType = UPDATE;

  if (idbdatafile::IDBPolicy::useHdfs())
    colStruct.fCompressionType = 2;

  colStructList.push_back(colStruct);
  cscColTypeList.push_back(colType);
  ColTuple colTuple;
  systemCatalogPtr = CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
  systemCatalogPtr->identity(CalpontSystemCatalog::EC);
  CalpontSystemCatalog::ROPair ropair;

  try
  {
    ropair = systemCatalogPtr->nextAutoIncrRid(columnoid);
  }
  catch (...)
  {
    rc = ERR_AUTOINC_RID;
  }

  if (rc != NO_ERROR)
    return rc;

  ridList.push_back(ropair.rid);
  colTuple.data = nextVal;
  colTuples.push_back(colTuple);
  colValueList.push_back(colTuples);
  rc = writeColumnRecords(txnId, cscColTypeList, colStructList, colValueList, ridList, SYSCOLUMN_BASE, false);

  if (rc != NO_ERROR)
    return rc;

  // flush PrimProc cache
  vector<LBID_t> blockList;
  BRM::LBIDRange_v lbidRanges;
  rc = BRMWrapper::getInstance()->lookupLbidRanges(OID_SYSCOLUMN_NEXTVALUE, lbidRanges);

  if (rc != NO_ERROR)
    return rc;

  LBIDRange_v::iterator it;

  for (it = lbidRanges.begin(); it != lbidRanges.end(); it++)
  {
    for (LBID_t lbid = it->start; lbid < (it->start + it->size); lbid++)
    {
      blockList.push_back(lbid);
    }
  }

  // Bug 5459 Flush FD cache
  std::vector<BRM::FileInfo> files;
  BRM::FileInfo aFile;
  aFile.oid = colStruct.dataOid;
  aFile.partitionNum = colStruct.fColPartition;
  aFile.dbRoot = colStruct.fColDbRoot;
  ;
  aFile.segmentNum = colStruct.fColSegment;
  aFile.compType = colStruct.fCompressionType;
  files.push_back(aFile);

  if (idbdatafile::IDBPolicy::useHdfs())
    cacheutils::purgePrimProcFdCache(files, Config::getLocalModuleID());

  rc = cacheutils::flushPrimProcAllverBlocks(blockList);

  if (rc != 0)
    rc = ERR_BLKCACHE_FLUSH_LIST;  // translate to WE error

  return rc;
}

/***********************************************************
 * DESCRIPTION:
 *    Flush compressed files in chunk manager
 * PARAMETERS:
 *    none
 * RETURN:
 *    none
 ***********************************************************/
int WriteEngineWrapper::flushDataFiles(int rc, const TxnID txnId, std::map<FID, FID>& columnOids)
{
  RemoveTxnFromLBIDMap(txnId);
  RemoveTxnFromDictMap(txnId);

  for (int i = 0; i < TOTAL_COMPRESS_OP; i++)
  {
    int rc1 = m_colOp[i]->flushFile(rc, columnOids);
    int rc2 = m_dctnry[i]->flushFile(rc, columnOids);

    if (rc == NO_ERROR)
    {
      rc = (rc1 != NO_ERROR) ? rc1 : rc2;
    }
  }

  return rc;
}

void WriteEngineWrapper::AddDictToList(const TxnID txnid, std::vector<BRM::LBID_t>& lbids)
{
  std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;

  mapIter = m_dictLBIDMap.find(txnid);

  if (mapIter == m_dictLBIDMap.end())
  {
    dictLBIDRec_t tempRecord;
    tempRecord.insert(lbids.begin(), lbids.end());
    m_dictLBIDMap[txnid] = tempRecord;
    return;
  }
  else
  {
    dictLBIDRec_t& txnRecord = mapIter->second;
    txnRecord.insert(lbids.begin(), lbids.end());
  }
}

// Get CPInfo for given starting LBID and column description structure.
int WriteEngineWrapper::GetLBIDRange(const BRM::LBID_t startingLBID, const ColStruct& colStruct,
                                     ExtCPInfo& cpInfo)
{
  int rtn;
  BRM::CPMaxMin maxMin;
  rtn = BRMWrapper::getInstance()->getExtentCPMaxMin(startingLBID, maxMin);
  bool isBinary = cpInfo.isBinaryColumn();
  maxMin.isBinaryColumn = isBinary;
  cpInfo.fCPInfo.firstLbid = startingLBID;
  if (rtn)
  {
    cpInfo.toInvalid();
    return rtn;
  }
  // if we are provided with CPInfo pointer to update, we record current range there.
  // please note that we may fail here for unknown extents - e.g., newly allocated ones.
  // for these we mark CPInfo as invalid (above) and proceed as usual.
  // XXX With this logic we may end with invalid ranges for extents that were
  //     allocated and recorded, yet not in our copy of extentmap.
  //     As we update only part of that extent, the recorded range will be for
  //     that part of the extent.
  //     It should be investigated whether such situation is possible.
  // XXX Please note that most if not all calls to AddLBIDToList are enclosed into
  //     RETURN_ON_ERROR() macro.
  //     If we have failed to obtain information above we will abort execution of the function
  //     that called us.
  //     This is potential source of bugs.
  cpInfo.fCPInfo.bigMax = maxMin.bigMax;
  cpInfo.fCPInfo.bigMin = maxMin.bigMin;
  cpInfo.fCPInfo.max = maxMin.max;
  cpInfo.fCPInfo.min = maxMin.min;
  cpInfo.fCPInfo.seqNum = maxMin.seqNum;
  cpInfo.fCPInfo.isBinaryColumn = maxMin.isBinaryColumn;
  return rtn;
}

/***********************************************************
 * DESCRIPTION:
 *    Add an lbid to a list of lbids for sending to markExtentsInvalid.
 *    However, rather than storing each lbid, store only unique first
 *    lbids. This is an optimization to prevent invalidating the same
 *    extents over and over.
 * PARAMETERS:
 *    txnid        - the lbid list is per txn. We use this to keep transactions
 *                   separated.
 *   These next are needed for dbrm to get the lbid:
 *    colStruct    - reference to ColStruct structure of column we record (provides segment and type).
 *    fbo          - file block offset
 *    cpInfo       - a CPInfo to collect if we need that.
 * RETURN: 0 => OK. -1 => error
 ***********************************************************/
int WriteEngineWrapper::AddLBIDtoList(const TxnID txnid, const ColStruct& colStruct, const int fbo,
                                      ExtCPInfo* cpInfo)
{
  int rtn = 0;

  BRM::LBID_t startingLBID;
  SP_TxnLBIDRec_t spTxnLBIDRec;
  std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter;

  // Find the set of extent starting LBIDs for this transaction. If not found, then create it.
  mapIter = m_txnLBIDMap.find(txnid);

  if (mapIter == m_txnLBIDMap.end())
  {
    // This is a new transaction.
    SP_TxnLBIDRec_t sptemp(new TxnLBIDRec);
    spTxnLBIDRec = sptemp;
    m_txnLBIDMap[txnid] = spTxnLBIDRec;
    //        cout << "New transaction entry " << txnid << " transaction count " << m_txnLBIDMap.size() <<
    //        endl;
  }
  else
  {
    spTxnLBIDRec = (*mapIter).second;
  }

  // Get the extent starting lbid given all these values (startingLBID is an out parameter).
  rtn = BRMWrapper::getInstance()->getStartLbid(colStruct.dataOid, colStruct.fColPartition,
                                                colStruct.fColSegment, fbo, startingLBID);

  if (rtn != 0)
    return -1;

  // if we are given the cpInfo (column's ranges can be traced), we should update it.
  if (cpInfo)
  {
    rtn = GetLBIDRange(startingLBID, colStruct, *cpInfo);
  }
  else
  {
    spTxnLBIDRec->AddLBID(startingLBID, colStruct.colDataType);
  }

  return rtn;
}

void WriteEngineWrapper::RemoveTxnFromDictMap(const TxnID txnid)
{
  std::tr1::unordered_map<TxnID, dictLBIDRec_t>::iterator mapIter;

  mapIter = m_dictLBIDMap.find(txnid);

  if (mapIter != m_dictLBIDMap.end())
  {
    m_dictLBIDMap.erase(txnid);
  }
}

int WriteEngineWrapper::markTxnExtentsAsInvalid(const TxnID txnid, bool erase)
{
  int rc = 0;
  std::tr1::unordered_map<TxnID, SP_TxnLBIDRec_t>::iterator mapIter = m_txnLBIDMap.find(txnid);
  if (mapIter != m_txnLBIDMap.end())
  {
    SP_TxnLBIDRec_t spTxnLBIDRec = (*mapIter).second;
    if (!spTxnLBIDRec->m_LBIDs.empty())
    {
      rc = BRMWrapper::getInstance()->markExtentsInvalid(spTxnLBIDRec->m_LBIDs, spTxnLBIDRec->m_ColDataTypes);
    }

    if (erase)
    {
      m_txnLBIDMap.erase(txnid);  // spTxnLBIDRec is auto-destroyed
    }
  }
  return rc;
}

/***********************************************************
 * DESCRIPTION:
 *    Remove a transaction LBID list from the LBID map
 *    Called when a transaction ends, either commit or rollback
 * PARAMETERS:
 *    txnid - the transaction to remove.
 * RETURN:
 *    error code
 ***********************************************************/
int WriteEngineWrapper::RemoveTxnFromLBIDMap(const TxnID txnid)
{
  return markTxnExtentsAsInvalid(txnid, true);
}

}  // end of namespace
